data_stat_task.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. from model.DataBaseUtils import MysqlUtils, CkUtils
  2. from datetime import datetime, timedelta, timezone
  3. from model.DateUtils import DateUtils
  4. import logging
  5. import time
  6. db = MysqlUtils()
  7. ck = CkUtils()
  8. dt = DateUtils()
  9. def dw_daily_bytedance_cost(ymd):
  10. logging.info(f'dw_daily_bytedance_cost 数据填充开始')
  11. sql = '''
  12. select x.dt,x.channel,pitcher,stage,x.platform,x.book,
  13. ifnull(view_count,0),ifnull(click_count,0),
  14. ifnull(follow_user,0),ifnull(cost,0)/100 as cost,
  15. ifnull(web_view_count,0) web_view_count,
  16. ifnull(platform_view_count,0) platform_view_count,
  17. ifnull(web_order_count,0) web_order_count,
  18. 'BYTEDANCE' type
  19. ,0 require_roi,0 require_mult
  20. from
  21. ( select dt, channel,stage,pitcher,platform,book from channel_info_daily cid
  22. where dt='{0}' and channel !=''
  23. and channel in
  24. (select distinct(channel) from channel_by_account_daily cbad
  25. where dt='{0}'
  26. and type ='BYTEDANCE'
  27. )
  28. ) x -- 只允许渠道MP、GDT
  29. left join
  30. (select channel,sum(cost) as cost,sum(view_count) as view_count,sum(valid_click_count) as click_count,sum(from_follow_uv) as follow_user,
  31. sum(web_view_count) as web_view_count,sum(platform_view_count) as platform_view_count,sum(web_order_count) as web_order_count
  32. from
  33. (select account_id,cost,view_count,valid_click_count,round(valid_click_count*official_account_follow_rate,0) as from_follow_uv,
  34. 0 as web_view_count,
  35. 0 as platform_view_count,
  36. 0 as web_order_count
  37. from daily_tt where date='{0} 00:00:00' ) a
  38. left join
  39. (select account_id,channel from channel_by_account_daily where dt='{0}') b
  40. on a.account_id=b.account_id group by channel)
  41. z on x.channel=z.channel
  42. '''.format(ymd)
  43. data = db.quchen_text.get_data_list(sql)
  44. data1 = []
  45. col = "dt,channel,pitcher,stage,platform,book,view_count,click_count,follow_user,cost,web_view_count,platform_view_count,web_order_count,type,require_roi,require_mult"
  46. for i in data:
  47. i[0] = str(i[0])
  48. i[9] = str(i[9])
  49. i[6] = float(i[6])
  50. i[7] = float(i[7])
  51. i[8] = float(i[8])
  52. i[9] = float(i[9])
  53. i[10] = float(i[10])
  54. i[11] = float(i[11])
  55. i[12] = float(i[12])
  56. data1.append(tuple(i))
  57. ck.execute(f"alter table dw_daily_bytedance_cost drop partition '{ymd}' ")
  58. logging.info(len(data1))
  59. ck.insertMany("dw_daily_bytedance_cost", col, tuple(data1))
  60. logging.info(f'dw_daily_bytedance_cost 数据填充结束')
  61. def platform_data_sum(ymd):
  62. logging.info('dw_daily_platform_cost开始数据更新')
  63. ck.execute("alter table game_data.dw_daily_platform_cost drop partition '{}' ".format(ymd))
  64. sql = f'''
  65. insert into game_data.dw_daily_platform_cost
  66. select * from game_data.dw_daily_channel_cost b where dt='{ymd}'
  67. '''
  68. ck.execute(sql)
  69. logging.info('dw_daily_platform_cost数据更新,结束')
  70. def dw_daily_channel_cost(ymd):
  71. logging.info("run> dw_daily_channel_cost")
  72. datatime_ymd = datetime.strptime(ymd, '%Y-%m-%d').astimezone(timezone(timedelta(hours=8))).timetuple()
  73. timestamp_ymd = time.mktime(datatime_ymd)
  74. tomorrow_ymd = (datetime.strptime(ymd, '%Y-%m-%d').astimezone(timezone(timedelta(hours=8))) + timedelta(
  75. days=1)).timetuple()
  76. timestamp_tom = time.mktime(tomorrow_ymd)
  77. sql = f"""
  78. select x.dt,x.channel,pitcher,stage,x.platform,x.book,
  79. ifnull(view_count,0),ifnull(click_count,0),
  80. ifnull(follow_user,0),ifnull(cost,0)/100 as cost,
  81. ifnull(web_view_count,0) web_view_count,
  82. ifnull(platform_view_count,0) platform_view_count,
  83. ifnull(web_order_count,0) web_order_count,
  84. if(stage ='趣程15期' or stage ='趣程26期' or stage ='趣程30期','GDT','MP') type
  85. ,0 require_roi,0 require_mult,ifnull(y.reg_num,0),ifnull(w.create_user_num,0)
  86. from
  87. ( select dt, channel,stage,pitcher,platform,book from channel_info_daily cid
  88. where dt='{ymd}' and channel !=''
  89. and channel in
  90. (select distinct(channel) from channel_by_account_daily cbad
  91. where dt='{ymd}'
  92. and (type ='GDT' or type='MP')
  93. )
  94. ) x -- 只允许渠道MP、GDT
  95. left join
  96. (select channel,sum(cost) as cost,sum(view_count) as view_count,sum(valid_click_count) as click_count,
  97. sum(from_follow_uv) as follow_user,
  98. sum(web_view_count) as web_view_count,
  99. sum(platform_view_count) as platform_view_count,
  100. sum(web_order_count) as web_order_count
  101. from
  102. (select account_id,cost,view_count,valid_click_count,
  103. round(valid_click_count*official_account_follow_rate,0) as from_follow_uv,
  104. 0 as web_view_count,
  105. 0 as platform_view_count,
  106. 0 as web_order_count
  107. from daily_vx where date='{ymd} 00:00:00'
  108. union
  109. select account_id,cost,view_count,valid_click_count,from_follow_uv,
  110. ifnull(web_commodity_page_view_count,0) as web_view_count,
  111. ifnull(platform_page_view_count,0) as platform_view_count,
  112. ifnull(web_order_count,0) as web_order_count
  113. from daily_qq where date='{ymd} 00:00:00' ) a
  114. left join
  115. (select account_id,channel from channel_by_account_daily where dt='{ymd}') b
  116. on a.account_id=b.account_id group by channel)
  117. z on x.channel=z.channel
  118. left join
  119. (
  120. select c.name as channel ,DATE(FROM_UNIXTIME(origin.create_time)) as wx_date,
  121. count(*) as reg_num
  122. from
  123. db_mp.h_member origin left join
  124. db_mp.mp_conf_agent a on origin.app_id =a.app_id and origin.agent_id = a.agent_id
  125. left join
  126. db_mp.mp_mp_conf b on a.advertiser_conf_id =b.id
  127. left join
  128. quchen_text.advertiser_vx c on b.mp_id =c.wechat_account_id
  129. where c.wechat_account_id is not null
  130. and origin.create_time > {timestamp_ymd} and origin.create_time < {timestamp_tom}
  131. group by name,wx_date
  132. ) y on x.channel= y.channel
  133. left join
  134. (
  135. select f.name as channel,DATE(FROM_UNIXTIME(a.create_time)) as wx_date,
  136. count(*) as create_user_num
  137. from db_mp.h_mg_role a
  138. left join db_mp.h_mem_game b on a.mg_mem_id = b.id
  139. left join db_mp.h_member c on b.mem_id = c.id
  140. left join db_mp.mp_conf_agent d on c.app_id = d.app_id and c.agent_id = d.agent_id
  141. left join db_mp.mp_mp_conf e on d.advertiser_conf_id =e.id
  142. left join quchen_text.advertiser_vx f on e.mp_id = f.wechat_account_id
  143. where f.name is not null
  144. and a.create_time > {timestamp_ymd} and a.create_time < {timestamp_tom}
  145. group by f.name,wx_date
  146. order by wx_date desc
  147. ) w on x.channel= w.channel
  148. """
  149. data = db.quchen_text.get_data_list(sql)
  150. data1 = []
  151. col = "dt,channel,pitcher,stage,platform,book,view_count,click_count,follow_user,cost,web_view_count,platform_view_count,web_order_count,type,require_roi,require_mult,reg_num,create_user_num"
  152. for i in data:
  153. i[0] = str(i[0])
  154. i[9] = str(i[9])
  155. i[6] = float(i[6])
  156. i[7] = float(i[7])
  157. i[8] = float(i[8])
  158. i[9] = float(i[9])
  159. i[10] = float(i[10])
  160. i[11] = float(i[11])
  161. i[12] = float(i[12])
  162. data1.append(tuple(i))
  163. ck.execute(f"alter table game_data.dw_daily_channel_cost drop partition '{ymd}' ")
  164. logging.info(len(data1))
  165. ck.insertMany("game_data.dw_daily_channel_cost", col, tuple(data1))
  166. def channel_by_account_daily(ymd):
  167. """返回当天消耗账户对应的公众号表"""
  168. logging.info("run> channel_by_account_daily")
  169. sql = """replace into channel_by_account_daily
  170. select '{0}' as dt,a.account_id as account_id, ifnull(ifnull(b.name,a.name),'') as channel,type from
  171. (select account_id,name,'GDT' type from advertiser_qq
  172. union
  173. select account_id,name,'MP' type from advertiser_vx
  174. union
  175. select advertiser_id,channel,'BYTEDANCE' type from advertiser_bytedance
  176. ) a
  177. left join
  178. (select b.account_id,b.name from
  179. (select min(end_time) as end_time,account_id from account_change where end_time>'{0}' GROUP BY account_id) a
  180. left join account_change b on a.end_time=b.end_time and a.account_id=b.account_id) b on a.account_id=b.account_id""".format(
  181. ymd)
  182. db.quchen_text.execute(sql)
  183. def channel_info_daily(ymd):
  184. """获取公众号某天的期数,投手,平台,书籍
  185. @ return [[]]
  186. """
  187. # 获取现在的全量公众号信息
  188. logging.info("run> channel_info_daily")
  189. sql = f"""select '{ymd}' as dt,a.name ,ifnull(stage,''),ifnull(pitcher,''),ifnull(platform,''),ifnull(book,'') from (
  190. select name from advertiser_vx where name is not null group by name-- 公众号全量表
  191. union
  192. select name from account_change group by name
  193. union
  194. select channel as name from pitcher_change group by channel
  195. union
  196. select name from platform_change group by name
  197. union
  198. select name from book_change group by name) a
  199. left join
  200. ( select name,ifnull(stage,'') stage,ifnull(pitcher,'') pitcher,
  201. ifnull(platform,'') platform,ifnull(book,'') book
  202. from advertiser_vx
  203. where name is not null
  204. and start_date <= '{ymd}'
  205. group by name,stage,pitcher,platform,book
  206. ) b on a.name=b.name
  207. """
  208. data = db.quchen_text.get_data_list(sql)
  209. pitcher_change = db.quchen_text.getData(
  210. "select b.channel as channel,pitcher from "
  211. "(select max(start_time) as start_time,channel from pitcher_change "
  212. " where start_time<='{}' GROUP BY channel) a"
  213. " left join pitcher_change b on a.start_time=b.start_time and a.channel=b.channel".format(ymd))
  214. platform_change = db.quchen_text.getData(
  215. "select b.name as channel,current_platform as platform from (select max(change_date) as change_date,name from platform_change "
  216. "where change_date<='{}' GROUP BY name) a "
  217. "left join platform_change b on a.change_date=b.change_date and a.name=b.name".format(ymd))
  218. book_change = db.quchen_text.getData(
  219. "select b.name as channel,book from (select max(start_time) as start_time,name from book_change "
  220. "where start_time<='{}' GROUP BY name) a "
  221. "left join book_change b on a.start_time=b.start_time and a.name=b.name".format(ymd))
  222. stage_change = db.quchen_text.getData(
  223. "select channel,stage from (select max(start_date) as start_date,channel from stage_change "
  224. "where start_date<='{}' GROUP BY channel) a "
  225. "left join stage_change using(start_date,channel)".format(ymd))
  226. for i in data:
  227. for j in pitcher_change:
  228. if i[1] == j[0]:
  229. i[3] = j[1]
  230. for k in platform_change:
  231. if i[1] == k[0]:
  232. i[4] = k[1]
  233. for h in book_change:
  234. if i[1] == h[0]:
  235. i[5] = h[1]
  236. for m in stage_change:
  237. if i[1] == m[0]:
  238. i[2] = m[1]
  239. insert_sql = "replace into channel_info_daily values (%s,%s,%s,%s,%s,%s) "
  240. db.quchen_text.executeMany(insert_sql, data)
  241. def ods_order(dt):
  242. sql = """ replace into ods_order
  243. select
  244. case platform when '掌中云' then DATE_FORMAT(STR_TO_DATE(order_time,'%Y-%m-%dT%H:%i:%s'),'%Y-%m-%d')
  245. when '掌读' then from_unixtime(order_time, '%Y-%m-%d')
  246. ELSE order_time end date,
  247. stage,platform,channel,channel_id,user_id,
  248. case when platform='掌中云' then DATE_FORMAT(STR_TO_DATE(order_time,'%Y-%m-%dT%H:%i:%s'),'%Y-%m-%d %H:%i:%s')
  249. when platform='掌读' then from_unixtime(order_time, '%Y-%m-%d %H:%i:%s')
  250. ELSE order_time end order_time,
  251. case when platform='掌中云' then DATE_FORMAT(STR_TO_DATE(reg_time,'%Y-%m-%dT%H:%i:%s'),'%Y-%m-%d %H:%i:%s')
  252. when platform='掌读' then from_unixtime(reg_time, '%Y-%m-%d %H:%i:%s')
  253. ELSE reg_time end reg_time,
  254. amount,from_novel,order_id,2 from `order` where date=UNIX_TIMESTAMP('{}')
  255. """.format(dt)
  256. db.quchen_text.execute(sql)
  257. def order_account_text():
  258. db.quchen_text.execute("truncate order_account_text")
  259. with open('./wending_account_config.csv', encoding='utf-8') as f:
  260. for i in f.readlines():
  261. db.quchen_text.execute("insert into order_account_text(platform,text) values ('文鼎','{}')".format(i))
  262. if __name__ == '__main__':
  263. # channel_info_daily('2021-02-06')
  264. # channel_by_account_daily('2021-02-05')
  265. for i in dt.getDateLists('2021-04-18', '2021-10-28'):
  266. print(i)
  267. channel_info_daily(i)
  268. dw_daily_channel_cost(i)
  269. # ods_order('2021-05-06')
  270. platform_data_sum(i)