data_stat_task.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. from model.DataBaseUtils import MysqlUtils, CkUtils
  2. from datetime import datetime, timedelta, timezone
  3. from model.DateUtils import DateUtils
  4. import logging
  5. import time
  6. db = MysqlUtils()
  7. ck = CkUtils()
  8. dt = DateUtils()
  9. def platform_data_sum(ymd):
  10. logging.info('dw_daily_platform_cost开始数据更新')
  11. ck.execute("alter table game_data.dw_daily_platform_cost drop partition '{}' ".format(ymd))
  12. sql = f'''
  13. insert into game_data.dw_daily_platform_cost
  14. select * from game_data.dw_daily_channel_cost b where dt='{ymd}'
  15. '''
  16. ck.execute(sql)
  17. logging.info('dw_daily_platform_cost数据更新,结束')
  18. def dw_daily_channel_cost(ymd):
  19. def table_name(datatime_tmp, datatime_realtime):
  20. str_year = min(datatime_tmp.tm_year, datatime_realtime.tm_year)
  21. str_mon = min(datatime_tmp.tm_mon, datatime_realtime.tm_mon)
  22. str_mon = str_mon if str_mon > 9 else '0' + str(str_mon)
  23. res = 'h_log_mem_login_{}{}'.format(str_year, str_mon)
  24. return res
  25. logging.info("run> dw_daily_channel_cost")
  26. datatime_ymd = datetime.strptime(ymd, '%Y-%m-%d').astimezone(timezone(timedelta(hours=8))).timetuple()
  27. datatime_ymd_tom = (datetime.strptime(ymd, '%Y-%m-%d').astimezone(timezone(timedelta(hours=8))) + timedelta(
  28. days=1)).timetuple()
  29. datatime_ymd_tom_after = (datetime.strptime(ymd, '%Y-%m-%d').astimezone(timezone(timedelta(hours=8))) + timedelta(
  30. days=2)).timetuple()
  31. datatime_realtime = datetime.now().timetuple()
  32. # datatime_str
  33. ymd_tom = (datetime.strptime(ymd, '%Y-%m-%d').astimezone(timezone(timedelta(hours=8))) + timedelta(
  34. days=1)).strftime('%Y-%m-%d')
  35. ymd_tom_after = (datetime.strptime(ymd, '%Y-%m-%d').astimezone(timezone(timedelta(hours=8))) + timedelta(
  36. days=2)).strftime('%Y-%m-%d')
  37. # timestamp
  38. timestamp_ymd = time.mktime(datatime_ymd)
  39. timestamp_tom = time.mktime(datatime_ymd_tom)
  40. # table_name
  41. table_name_login_today = table_name(datatime_ymd, datatime_realtime)
  42. table_name_login_tom = table_name(datatime_ymd_tom, datatime_realtime)
  43. table_name_login_tom_after = table_name(datatime_ymd_tom_after, datatime_realtime)
  44. sql = f"""
  45. select x.dt,x.channel,pitcher,stage,x.platform,x.book,
  46. ifnull(view_count,0),ifnull(click_count,0),
  47. ifnull(follow_user,0),ifnull(cost,0)/100 as cost,
  48. ifnull(web_view_count,0) web_view_count,
  49. ifnull(platform_view_count,0) platform_view_count,
  50. ifnull(web_order_count,0) web_order_count,
  51. if(stage ='趣程15期' or stage ='趣程26期' or stage ='趣程30期','GDT','MP') type
  52. ,0 require_roi,0 require_mult,
  53. ifnull(y.reg_num,0),ifnull(w.create_user_num,0),
  54. v.today_active_user_rate,
  55. v.second_stay_rate,
  56. v.third_stay_rate,
  57. v.game_user_sum
  58. from
  59. ( select dt, channel,stage,pitcher,platform,book from channel_info_daily cid
  60. where dt='{ymd}' and channel !=''
  61. and channel in
  62. (select distinct(channel) from channel_by_account_daily cbad
  63. where dt='{ymd}'
  64. and (type ='GDT' or type='MP')
  65. )
  66. ) x -- 只允许渠道MP、GDT
  67. left join
  68. (select channel,sum(cost) as cost,sum(view_count) as view_count,sum(valid_click_count) as click_count,
  69. sum(from_follow_uv) as follow_user,
  70. sum(web_view_count) as web_view_count,
  71. sum(platform_view_count) as platform_view_count,
  72. sum(web_order_count) as web_order_count
  73. from
  74. (select account_id,cost,view_count,valid_click_count,
  75. round(valid_click_count*official_account_follow_rate,0) as from_follow_uv,
  76. 0 as web_view_count,
  77. 0 as platform_view_count,
  78. 0 as web_order_count
  79. from daily_vx where date='{ymd} 00:00:00'
  80. union
  81. select account_id,cost,view_count,valid_click_count,from_follow_uv,
  82. ifnull(web_commodity_page_view_count,0) as web_view_count,
  83. ifnull(platform_page_view_count,0) as platform_view_count,
  84. ifnull(web_order_count,0) as web_order_count
  85. from daily_qq where date='{ymd} 00:00:00' ) a
  86. left join
  87. (select account_id,channel from channel_by_account_daily where dt='{ymd}') b
  88. on a.account_id=b.account_id group by channel)
  89. z on x.channel=z.channel
  90. left join
  91. (
  92. select c.name as channel ,DATE(FROM_UNIXTIME(origin.create_time)) as wx_date,
  93. count(*) as reg_num
  94. from
  95. db_mp.h_member origin left join
  96. db_mp.mp_conf_agent a on origin.app_id =a.app_id and origin.agent_id = a.agent_id
  97. left join
  98. db_mp.mp_mp_conf b on a.advertiser_conf_id =b.id
  99. left join
  100. quchen_text.advertiser_vx c on b.mp_id =c.wechat_account_id
  101. where c.wechat_account_id is not null
  102. and origin.create_time > {timestamp_ymd} and origin.create_time < {timestamp_tom}
  103. group by name,wx_date
  104. ) y on x.channel= y.channel
  105. left join
  106. (
  107. select f.name as channel,DATE(FROM_UNIXTIME(c.create_time)) as wx_date,
  108. count(*) as create_user_num
  109. from db_mp.h_mg_role a
  110. left join db_mp.h_mem_game b on a.mg_mem_id = b.id
  111. left join db_mp.h_member c on b.mem_id = c.id
  112. left join db_mp.mp_conf_agent d on c.app_id = d.app_id and c.agent_id = d.agent_id
  113. left join db_mp.mp_mp_conf e on d.advertiser_conf_id =e.id
  114. left join quchen_text.advertiser_vx f on e.mp_id = f.wechat_account_id
  115. where f.name is not null
  116. and c.create_time >= {timestamp_ymd} and c.create_time <= {timestamp_tom}
  117. group by f.name,wx_date
  118. order by wx_date desc
  119. ) w on x.channel= w.channel
  120. left join
  121. (
  122. select channel ,
  123. if(sum(d_ct)=0,0,ifnull(sum(e_ct),0)/sum(d_ct)) as today_active_user_rate,
  124. if(sum(a_ct)=0,0,ifnull(sum(b_ct),0)/sum(a_ct)) as second_stay_rate,
  125. if(sum(a_ct)=0,0,ifnull(sum(c_ct),0)/sum(a_ct)) as third_stay_rate,
  126. sum(d_ct) game_user_sum from
  127. (select h.name as channel ,
  128. a.ct as a_ct,b.ct as b_ct,c.ct as c_ct,d.ct as d_ct,e.ct as e_ct
  129. from
  130. (select '{ymd}',a.app_id,a.agent_id,count(*) as ct from
  131. db_mp.h_member a
  132. left join (select distinct(mem_id) from db_mp.{table_name_login_today}
  133. where date = '{ymd}' ) b on a.id=b.mem_id
  134. where a.create_time >={timestamp_ymd} and a.create_time <={timestamp_tom}
  135. and b.mem_id is not null
  136. group by a.app_id ,a.agent_id ) a
  137. left join
  138. (select '{ymd}',a.app_id,a.agent_id,count(*) as ct from
  139. db_mp.h_member a
  140. left join (select distinct(mem_id) from db_mp.{table_name_login_tom}
  141. where date = '{ymd_tom}' ) b on a.id=b.mem_id
  142. where a.create_time >={timestamp_ymd} and a.create_time <={timestamp_tom}
  143. and b.mem_id is not null
  144. group by a.app_id ,a.agent_id ) b on a.app_id =b.app_id and a.agent_id =b.agent_id
  145. left join
  146. (select '{ymd}',a.app_id,a.agent_id,count(*) as ct from
  147. db_mp.h_member a
  148. left join (select distinct(mem_id) from db_mp.{table_name_login_tom_after}
  149. where date = '{ymd_tom_after}' ) b on a.id=b.mem_id
  150. where a.create_time >={timestamp_ymd} and a.create_time <={timestamp_tom}
  151. and b.mem_id is not null
  152. group by a.app_id ,a.agent_id ) c on a.app_id =c.app_id and a.agent_id = c.agent_id
  153. left join
  154. (select app_id ,agent_id ,count(*) as ct from db_mp.h_member hm
  155. where create_time <={timestamp_tom}
  156. group by app_id ,agent_id ) d on a.app_id =d.app_id and a.agent_id =d.agent_id
  157. left join
  158. (select count(distinct(mem_id)) as ct,app_id ,agent_id from db_mp.{table_name_login_today}
  159. where date = '{ymd}'
  160. group by app_id ,agent_id ) e on a.agent_id =e.agent_id and a.app_id =e.app_id
  161. left join db_mp.mp_conf_agent f on a.app_id =f.app_id and a.agent_id =f.agent_id
  162. left join db_mp.mp_mp_conf g on f.advertiser_conf_id = g.id
  163. left join quchen_text.advertiser_vx h on g.mp_id = h.wechat_account_id
  164. where h.name is not null) as keep_data
  165. group by channel)
  166. v on x.channel= v.channel
  167. """
  168. # print(sql)
  169. data = db.quchen_text.get_data_list(sql)
  170. data1 = []
  171. col = "dt,channel,pitcher,stage,platform,book,view_count,click_count,follow_user,cost,web_view_count,platform_view_count,web_order_count,type,require_roi,require_mult,reg_num,create_user_num,today_active_user_rate,second_stay_rate,third_stay_rate,game_user_sum"
  172. for i in data:
  173. i[0] = str(i[0])
  174. i[9] = str(i[9])
  175. i[6] = float(i[6])
  176. i[7] = float(i[7])
  177. i[8] = float(i[8])
  178. i[9] = float(i[9])
  179. i[10] = float(i[10])
  180. i[11] = float(i[11])
  181. i[12] = float(i[12])
  182. i[18] = float(i[18]) if i[18] else 0
  183. i[19] = float(i[19]) if i[19] else 0
  184. i[20] = float(i[20]) if i[20] else 0
  185. i[21] = float(i[21]) if i[21] else 0
  186. data1.append(tuple(i))
  187. ck.execute(f"alter table game_data.dw_daily_channel_cost drop partition '{ymd}' ")
  188. logging.info(len(data1))
  189. ck.insertMany("game_data.dw_daily_channel_cost", col, tuple(data1))
  190. def channel_by_account_daily(ymd):
  191. """返回当天消耗账户对应的公众号表"""
  192. logging.info("run> channel_by_account_daily")
  193. sql = """replace into channel_by_account_daily
  194. select '{0}' as dt,a.account_id as account_id, ifnull(ifnull(b.name,a.name),'') as channel,type from
  195. (select account_id,name,'GDT' type from advertiser_qq
  196. union
  197. select account_id,name,'MP' type from advertiser_vx
  198. union
  199. select advertiser_id,channel,'BYTEDANCE' type from advertiser_bytedance
  200. ) a
  201. left join
  202. (select b.account_id,b.name from
  203. (select min(end_time) as end_time,account_id from account_change where end_time>'{0}' GROUP BY account_id) a
  204. left join account_change b on a.end_time=b.end_time and a.account_id=b.account_id) b on a.account_id=b.account_id""".format(
  205. ymd)
  206. db.quchen_text.execute(sql)
  207. def channel_info_daily(ymd):
  208. """获取公众号某天的期数,投手,平台,书籍
  209. @ return [[]]
  210. """
  211. # 获取现在的全量公众号信息
  212. logging.info("run> channel_info_daily")
  213. sql = f"""select '{ymd}' as dt,a.name ,ifnull(stage,''),ifnull(pitcher,''),ifnull(platform,''),ifnull(book,'') from (
  214. select name from advertiser_vx where name is not null group by name-- 公众号全量表
  215. union
  216. select name from account_change group by name
  217. union
  218. select channel as name from pitcher_change group by channel
  219. union
  220. select name from platform_change group by name
  221. union
  222. select name from book_change group by name) a
  223. left join
  224. ( select name,ifnull(stage,'') stage,ifnull(pitcher,'') pitcher,
  225. ifnull(platform,'') platform,ifnull(book,'') book
  226. from advertiser_vx
  227. where name is not null
  228. and start_date <= '{ymd}'
  229. group by name,stage,pitcher,platform,book
  230. ) b on a.name=b.name
  231. """
  232. data = db.quchen_text.get_data_list(sql)
  233. pitcher_change = db.quchen_text.getData(
  234. "select b.channel as channel,pitcher from "
  235. "(select max(start_time) as start_time,channel from pitcher_change "
  236. " where start_time<='{}' GROUP BY channel) a"
  237. " left join pitcher_change b on a.start_time=b.start_time and a.channel=b.channel".format(ymd))
  238. platform_change = db.quchen_text.getData(
  239. "select b.name as channel,current_platform as platform from (select max(change_date) as change_date,name from platform_change "
  240. "where change_date<='{}' GROUP BY name) a "
  241. "left join platform_change b on a.change_date=b.change_date and a.name=b.name".format(ymd))
  242. book_change = db.quchen_text.getData(
  243. "select b.name as channel,book from (select max(start_time) as start_time,name from book_change "
  244. "where start_time<='{}' GROUP BY name) a "
  245. "left join book_change b on a.start_time=b.start_time and a.name=b.name".format(ymd))
  246. stage_change = db.quchen_text.getData(
  247. "select channel,stage from (select max(start_date) as start_date,channel from stage_change "
  248. "where start_date<='{}' GROUP BY channel) a "
  249. "left join stage_change using(start_date,channel)".format(ymd))
  250. for i in data:
  251. for j in pitcher_change:
  252. if i[1] == j[0]:
  253. i[3] = j[1]
  254. for k in platform_change:
  255. if i[1] == k[0]:
  256. i[4] = k[1]
  257. for h in book_change:
  258. if i[1] == h[0]:
  259. i[5] = h[1]
  260. for m in stage_change:
  261. if i[1] == m[0]:
  262. i[2] = m[1]
  263. insert_sql = "replace into channel_info_daily values (%s,%s,%s,%s,%s,%s) "
  264. db.quchen_text.executeMany(insert_sql, data)
  265. def ods_order(dt):
  266. sql = """ replace into ods_order
  267. select
  268. case platform when '掌中云' then DATE_FORMAT(STR_TO_DATE(order_time,'%Y-%m-%dT%H:%i:%s'),'%Y-%m-%d')
  269. when '掌读' then from_unixtime(order_time, '%Y-%m-%d')
  270. ELSE order_time end date,
  271. stage,platform,channel,channel_id,user_id,
  272. case when platform='掌中云' then DATE_FORMAT(STR_TO_DATE(order_time,'%Y-%m-%dT%H:%i:%s'),'%Y-%m-%d %H:%i:%s')
  273. when platform='掌读' then from_unixtime(order_time, '%Y-%m-%d %H:%i:%s')
  274. ELSE order_time end order_time,
  275. case when platform='掌中云' then DATE_FORMAT(STR_TO_DATE(reg_time,'%Y-%m-%dT%H:%i:%s'),'%Y-%m-%d %H:%i:%s')
  276. when platform='掌读' then from_unixtime(reg_time, '%Y-%m-%d %H:%i:%s')
  277. ELSE reg_time end reg_time,
  278. amount,from_novel,order_id,2 from `order` where date=UNIX_TIMESTAMP('{}')
  279. """.format(dt)
  280. db.quchen_text.execute(sql)
  281. def order_account_text():
  282. db.quchen_text.execute("truncate order_account_text")
  283. with open('./wending_account_config.csv', encoding='utf-8') as f:
  284. for i in f.readlines():
  285. db.quchen_text.execute("insert into order_account_text(platform,text) values ('文鼎','{}')".format(i))
  286. if __name__ == '__main__':
  287. # channel_info_daily('2021-02-06')
  288. # channel_by_account_daily('2021-02-05')
  289. dw_daily_channel_cost('2021-11-06')
  290. exit()
  291. for i in dt.getDateLists('2021-09-08', '2021-11-02'):
  292. print(i)
  293. channel_info_daily(i)
  294. dw_daily_channel_cost(i)
  295. # ods_order('2021-05-06')
  296. platform_data_sum(i)