from model.DataBaseUtils import MysqlUtils, CkUtils from datetime import datetime, timedelta, timezone from model.DateUtils import DateUtils import logging import time db = MysqlUtils() ck = CkUtils() dt = DateUtils() def platform_data_sum(ymd): logging.info('dw_daily_platform_cost开始数据更新') ck.execute("alter table game_data.dw_daily_platform_cost drop partition '{}' ".format(ymd)) sql = f''' insert into game_data.dw_daily_platform_cost select * from game_data.dw_daily_channel_cost b where dt='{ymd}' ''' ck.execute(sql) logging.info('dw_daily_platform_cost数据更新,结束') def dw_daily_channel_cost(ymd): def table_name(datatime_tmp, datatime_realtime): datatime_use = min(datatime_tmp, datatime_realtime) str_year = datatime_use.tm_year str_mon = datatime_use.tm_mon str_mon = str_mon if str_mon > 9 else '0' + str(str_mon) res = 'h_log_mem_login_{}{}'.format(str_year, str_mon) return res logging.info("run> dw_daily_channel_cost") datatime_ymd = datetime.strptime(ymd, '%Y-%m-%d').astimezone(timezone(timedelta(hours=8))).timetuple() datatime_ymd_tom = (datetime.strptime(ymd, '%Y-%m-%d').astimezone(timezone(timedelta(hours=8))) + timedelta( days=1)).timetuple() datatime_ymd_tom_after = (datetime.strptime(ymd, '%Y-%m-%d').astimezone(timezone(timedelta(hours=8))) + timedelta( days=2)).timetuple() datatime_ymd_seven_day = (datetime.strptime(ymd, '%Y-%m-%d').astimezone(timezone(timedelta(hours=8))) + timedelta( days=7)).timetuple() datatime_ymd_fifteen_day = (datetime.strptime(ymd, '%Y-%m-%d').astimezone(timezone(timedelta(hours=8))) + timedelta( days=15)).timetuple() datatime_ymd_thirty_day = (datetime.strptime(ymd, '%Y-%m-%d').astimezone(timezone(timedelta(hours=8))) + timedelta( days=30)).timetuple() datatime_ymd_fortyfive_day = ( datetime.strptime(ymd, '%Y-%m-%d').astimezone(timezone(timedelta(hours=8))) + timedelta( days=45)).timetuple() datatime_ymd_sixty_day = (datetime.strptime(ymd, '%Y-%m-%d').astimezone(timezone(timedelta(hours=8))) + timedelta( days=60)).timetuple() datatime_realtime = datetime.now().timetuple() # datatime_str ymd_tom = (datetime.strptime(ymd, '%Y-%m-%d').astimezone(timezone(timedelta(hours=8))) + timedelta( days=1)).strftime('%Y-%m-%d') ymd_tom_after = (datetime.strptime(ymd, '%Y-%m-%d').astimezone(timezone(timedelta(hours=8))) + timedelta( days=2)).strftime('%Y-%m-%d') ymd_seven_day = (datetime.strptime(ymd, '%Y-%m-%d').astimezone(timezone(timedelta(hours=8))) + timedelta( days=7)).strftime('%Y-%m-%d') ymd_fifteen_day = (datetime.strptime(ymd, '%Y-%m-%d').astimezone(timezone(timedelta(hours=8))) + timedelta( days=15)).strftime('%Y-%m-%d') ymd_thirty_day = (datetime.strptime(ymd, '%Y-%m-%d').astimezone(timezone(timedelta(hours=8))) + timedelta( days=30)).strftime('%Y-%m-%d') ymd_fortyfive_day = (datetime.strptime(ymd, '%Y-%m-%d').astimezone(timezone(timedelta(hours=8))) + timedelta( days=45)).strftime('%Y-%m-%d') ymd_sixty_day = (datetime.strptime(ymd, '%Y-%m-%d').astimezone(timezone(timedelta(hours=8))) + timedelta( days=60)).strftime('%Y-%m-%d') # timestamp timestamp_ymd = time.mktime(datatime_ymd) timestamp_tom = time.mktime(datatime_ymd_tom) # table_name table_name_login_today = table_name(datatime_ymd, datatime_realtime) table_name_login_tom = table_name(datatime_ymd_tom, datatime_realtime) table_name_login_tom_after = table_name(datatime_ymd_tom_after, datatime_realtime) table_name_login_seven_day = table_name(datatime_ymd_seven_day, datatime_realtime) table_name_login_fifteen_day = table_name(datatime_ymd_fifteen_day, datatime_realtime) table_name_login_thirty_day = table_name(datatime_ymd_thirty_day, datatime_realtime) table_name_login_fortyfive_day = table_name(datatime_ymd_fortyfive_day, datatime_realtime) table_name_login_sixty_day = table_name(datatime_ymd_sixty_day, datatime_realtime) sql = f""" select x.dt,x.channel,pitcher,stage,x.platform,x.book, ifnull(view_count,0),ifnull(click_count,0), ifnull(follow_user,0),ifnull(cost,0)/100 as cost, ifnull(web_view_count,0) web_view_count, ifnull(platform_view_count,0) platform_view_count, ifnull(web_order_count,0) web_order_count, if(stage ='趣程15期' or stage ='趣程26期' or stage ='趣程30期','GDT','MP') type ,0 require_roi,0 require_mult, ifnull(y.reg_num,0),ifnull(w.create_user_num,0), v.today_active_user_rate, v.second_stay_rate, v.third_stay_rate, v.seven_stay_rate, v.fifteen_stay_rate, v.thirty_stay_rate, v.fortyfive_stay_rate, v.sixty_stay_rate, v.game_user_sum from ( select dt, channel,stage,pitcher,platform,book from channel_info_daily cid where dt='{ymd}' and channel !='' and channel in (select distinct(channel) from channel_by_account_daily cbad where dt='{ymd}' and (type ='GDT' or type='MP') ) ) x -- 只允许渠道MP、GDT left join (select channel,sum(cost) as cost,sum(view_count) as view_count,sum(valid_click_count) as click_count, sum(from_follow_uv) as follow_user, sum(web_view_count) as web_view_count, sum(platform_view_count) as platform_view_count, sum(web_order_count) as web_order_count from (select account_id,cost,view_count,valid_click_count, round(valid_click_count*official_account_follow_rate,0) as from_follow_uv, 0 as web_view_count, 0 as platform_view_count, 0 as web_order_count from daily_vx where date='{ymd} 00:00:00' union select account_id,cost,view_count,valid_click_count,from_follow_uv, ifnull(web_commodity_page_view_count,0) as web_view_count, ifnull(platform_page_view_count,0) as platform_view_count, ifnull(web_order_count,0) as web_order_count from daily_qq where date='{ymd} 00:00:00' ) a left join (select account_id,channel from channel_by_account_daily where dt='{ymd}') b on a.account_id=b.account_id group by channel) z on x.channel=z.channel left join ( select h.name as channel ,DATE(FROM_UNIXTIME(origin.create_time)) as wx_date, count(*) as reg_num from db_mp.h_member origin left join (select a.name,d.app_id,d.agent_id from quchen_text.advertiser_vx a left join db_mp.h_game b on a.book = b.name left join db_mp.mp_mp_conf c on a.name= c.wx_name left join db_mp.mp_conf_agent d on c.id=d.advertiser_conf_id where d.app_id =b.id group by d.app_id ,d.agent_id ) h on origin.app_id = h.app_id and origin.agent_id = h.agent_id where h.name is not null and origin.create_time > {timestamp_ymd} and origin.create_time < {timestamp_tom} group by name,wx_date ) y on x.channel= y.channel left join ( select h.name as channel,DATE(FROM_UNIXTIME(c.create_time)) as wx_date, count(*) as create_user_num from db_mp.h_mg_role a left join db_mp.h_mem_game b on a.mg_mem_id = b.id left join db_mp.h_member c on b.mem_id = c.id left join (select a.name,d.app_id,d.agent_id from quchen_text.advertiser_vx a left join db_mp.h_game b on a.book = b.name left join db_mp.mp_mp_conf c on a.name= c.wx_name left join db_mp.mp_conf_agent d on c.id=d.advertiser_conf_id where d.app_id =b.id group by d.app_id ,d.agent_id ) h on c.app_id = h.app_id and c.agent_id = h.agent_id where h.name is not null and c.create_time >= {timestamp_ymd} and c.create_time <= {timestamp_tom} group by h.name,wx_date order by wx_date desc ) w on x.channel= w.channel left join ( select channel , if(max(d_ct)=0,0,ifnull(max(e_ct),0)/max(d_ct)) as today_active_user_rate, if(sum(a_ct)=0,0,ifnull(sum(b_ct),0)/sum(a_ct)) as second_stay_rate, if(sum(a_ct)=0,0,ifnull(sum(c_ct),0)/sum(a_ct)) as third_stay_rate, if(sum(a_ct)=0,0,ifnull(sum(seven_ct),0)/sum(a_ct)) as seven_stay_rate, if(sum(a_ct)=0,0,ifnull(sum(fifteen_ct),0)/sum(a_ct)) as fifteen_stay_rate, if(sum(a_ct)=0,0,ifnull(sum(thirty_ct),0)/sum(a_ct)) as thirty_stay_rate, if(sum(a_ct)=0,0,ifnull(sum(fortyfive_ct),0)/sum(a_ct)) as fortyfive_stay_rate, if(sum(a_ct)=0,0,ifnull(sum(sixty_ct),0)/sum(a_ct)) as sixty_stay_rate, max(d_ct) game_user_sum from (select h.name as channel , a.ct as a_ct,b.ct as b_ct,c.ct as c_ct,d.ct as d_ct,e.ct as e_ct, seven.ct as seven_ct,fifteen.ct as fifteen_ct,thirty.ct as thirty_ct, fortyfive.ct as fortyfive_ct,sixty.ct as sixty_ct from (select '{ymd}',a.app_id,a.agent_id,count(*) as ct from db_mp.h_member a left join (select distinct(mem_id) from db_mp.{table_name_login_today} where date = '{ymd}' ) b on a.id=b.mem_id where a.create_time >={timestamp_ymd} and a.create_time <={timestamp_tom} and b.mem_id is not null group by a.app_id ,a.agent_id ) a left join (select '{ymd}',a.app_id,a.agent_id,count(*) as ct from db_mp.h_member a left join (select distinct(mem_id) from db_mp.{table_name_login_tom} where date = '{ymd_tom}' ) b on a.id=b.mem_id where a.create_time >={timestamp_ymd} and a.create_time <={timestamp_tom} and b.mem_id is not null group by a.app_id ,a.agent_id ) b on a.app_id =b.app_id and a.agent_id =b.agent_id left join (select '{ymd}',a.app_id,a.agent_id,count(*) as ct from db_mp.h_member a left join (select distinct(mem_id) from db_mp.{table_name_login_tom_after} where date = '{ymd_tom_after}' ) b on a.id=b.mem_id where a.create_time >={timestamp_ymd} and a.create_time <={timestamp_tom} and b.mem_id is not null group by a.app_id ,a.agent_id ) c on a.app_id =c.app_id and a.agent_id = c.agent_id left join (select '{ymd}',a.app_id,a.agent_id,count(*) as ct from db_mp.h_member a left join (select distinct(mem_id) from db_mp.{table_name_login_seven_day} where date = '{ymd_seven_day}' ) b on a.id=b.mem_id where a.create_time >={timestamp_ymd} and a.create_time <={timestamp_tom} and b.mem_id is not null group by a.app_id ,a.agent_id ) seven on a.app_id =seven.app_id and a.agent_id = seven.agent_id left join (select '{ymd}',a.app_id,a.agent_id,count(*) as ct from db_mp.h_member a left join (select distinct(mem_id) from db_mp.{table_name_login_fifteen_day} where date = '{ymd_fifteen_day}' ) b on a.id=b.mem_id where a.create_time >={timestamp_ymd} and a.create_time <={timestamp_tom} and b.mem_id is not null group by a.app_id ,a.agent_id ) fifteen on a.app_id =fifteen.app_id and a.agent_id = fifteen.agent_id left join (select '{ymd}',a.app_id,a.agent_id,count(*) as ct from db_mp.h_member a left join (select distinct(mem_id) from db_mp.{table_name_login_thirty_day} where date = '{ymd_thirty_day}' ) b on a.id=b.mem_id where a.create_time >={timestamp_ymd} and a.create_time <={timestamp_tom} and b.mem_id is not null group by a.app_id ,a.agent_id ) thirty on a.app_id =thirty.app_id and a.agent_id = thirty.agent_id left join (select '{ymd}',a.app_id,a.agent_id,count(*) as ct from db_mp.h_member a left join (select distinct(mem_id) from db_mp.{table_name_login_fortyfive_day} where date = '{ymd_fortyfive_day}' ) b on a.id=b.mem_id where a.create_time >={timestamp_ymd} and a.create_time <={timestamp_tom} and b.mem_id is not null group by a.app_id ,a.agent_id ) fortyfive on a.app_id =fortyfive.app_id and a.agent_id = fortyfive.agent_id left join (select '{ymd}',a.app_id,a.agent_id,count(*) as ct from db_mp.h_member a left join (select distinct(mem_id) from db_mp.{table_name_login_sixty_day} where date = '{ymd_sixty_day}' ) b on a.id=b.mem_id where a.create_time >={timestamp_ymd} and a.create_time <={timestamp_tom} and b.mem_id is not null group by a.app_id ,a.agent_id ) sixty on a.app_id =sixty.app_id and a.agent_id = sixty.agent_id left join (select app_id ,count(*) as ct from db_mp.h_member hm where create_time <={timestamp_tom} group by app_id ) d on a.app_id =d.app_id left join (select count(distinct(mem_id)) as ct,app_id from db_mp.{table_name_login_today} where date = '{ymd}' group by app_id ) e on a.app_id =e.app_id left join (select a.name as name,d.app_id as app_id ,d.agent_id as agent_id from quchen_text.advertiser_vx a left join db_mp.h_game b on a.book = b.name left join db_mp.mp_mp_conf c on a.name= c.wx_name left join db_mp.mp_conf_agent d on c.id=d.advertiser_conf_id where d.app_id =b.id group by d.app_id ,d.agent_id) h on a.app_id = h.app_id and a.agent_id = h.agent_id where h.name is not null) as keep_data group by channel) v on x.channel= v.channel """ print(sql) data = db.quchen_text.get_data_list(sql) data1 = [] col = "dt,channel,pitcher,stage,platform,book,view_count,click_count,follow_user,cost,web_view_count,platform_view_count,web_order_count,type,require_roi,require_mult,reg_num,create_user_num,today_active_user_rate,second_stay_rate,third_stay_rate,seven_stay_rate,fifteen_stay_rate,thirty_stay_rate,fortyfive_stay_rate,sixty_stay_rate,game_user_sum" for i in data: i[0] = str(i[0]) i[9] = str(i[9]) i[6] = float(i[6]) i[7] = float(i[7]) i[8] = float(i[8]) i[9] = float(i[9]) i[10] = float(i[10]) i[11] = float(i[11]) i[12] = float(i[12]) i[18] = float(i[18]) if i[18] else 0 i[19] = float(i[19]) if i[19] else 0 i[20] = float(i[20]) if i[20] else 0 i[21] = float(i[21]) if i[21] else 0 i[22] = float(i[22]) if i[22] else 0 i[23] = float(i[23]) if i[23] else 0 i[24] = float(i[24]) if i[24] else 0 i[25] = float(i[25]) if i[25] else 0 i[26] = float(i[26]) if i[26] else 0 data1.append(tuple(i)) for _ in data1: print(_) ck.execute(f"alter table game_data.dw_daily_channel_cost drop partition '{ymd}' ") logging.info(len(data1)) ck.insertMany("game_data.dw_daily_channel_cost", col, tuple(data1)) def channel_by_account_daily(ymd): """返回当天消耗账户对应的公众号表""" logging.info("run> channel_by_account_daily") sql = """replace into channel_by_account_daily select '{0}' as dt,a.account_id as account_id, ifnull(ifnull(b.name,a.name),'') as channel,type from (select account_id,name,'GDT' type from advertiser_qq union select account_id,name,'MP' type from advertiser_vx union select advertiser_id,channel,'BYTEDANCE' type from advertiser_bytedance ) a left join (select b.account_id,b.name from (select min(end_time) as end_time,account_id from account_change where end_time>'{0}' GROUP BY account_id) a left join account_change b on a.end_time=b.end_time and a.account_id=b.account_id) b on a.account_id=b.account_id""".format( ymd) db.quchen_text.execute(sql) def channel_info_daily(ymd): """获取公众号某天的期数,投手,平台,书籍 @ return [[]] """ # 获取现在的全量公众号信息 logging.info("run> channel_info_daily") sql = f"""select '{ymd}' as dt,a.name ,ifnull(stage,''),ifnull(pitcher,''),ifnull(platform,''),ifnull(book,'') from ( select name from advertiser_vx where name is not null group by name-- 公众号全量表 union select name from account_change group by name union select channel as name from pitcher_change group by channel union select name from platform_change group by name union select name from book_change group by name) a left join ( select name,ifnull(stage,'') stage,ifnull(pitcher,'') pitcher, ifnull(platform,'') platform,ifnull(book,'') book from advertiser_vx where name is not null and start_date <= '{ymd}' and if(end_date is null,1,end_date >= '{ymd}') group by name,stage,pitcher,platform,book ) b on a.name=b.name """ data = db.quchen_text.get_data_list(sql) pitcher_change = db.quchen_text.getData( "select b.channel as channel,pitcher from " "(select max(start_time) as start_time,channel from pitcher_change " " where start_time<='{}' GROUP BY channel) a" " left join pitcher_change b on a.start_time=b.start_time and a.channel=b.channel".format(ymd)) platform_change = db.quchen_text.getData( "select b.name as channel,current_platform as platform from (select max(change_date) as change_date,name from platform_change " "where change_date<='{}' GROUP BY name) a " "left join platform_change b on a.change_date=b.change_date and a.name=b.name".format(ymd)) book_change = db.quchen_text.getData( "select b.name as channel,book from (select max(start_time) as start_time,name from book_change " "where start_time<='{}' GROUP BY name) a " "left join book_change b on a.start_time=b.start_time and a.name=b.name".format(ymd)) stage_change = db.quchen_text.getData( "select channel,stage from (select max(start_date) as start_date,channel from stage_change " "where start_date<='{}' GROUP BY channel) a " "left join stage_change using(start_date,channel)".format(ymd)) for i in data: for j in pitcher_change: if i[1] == j[0]: i[3] = j[1] for k in platform_change: if i[1] == k[0]: i[4] = k[1] for h in book_change: if i[1] == h[0]: i[5] = h[1] for m in stage_change: if i[1] == m[0]: i[2] = m[1] insert_sql = "replace into channel_info_daily values (%s,%s,%s,%s,%s,%s) " db.quchen_text.executeMany(insert_sql, data) def ods_order(dt): sql = """ replace into ods_order select case platform when '掌中云' then DATE_FORMAT(STR_TO_DATE(order_time,'%Y-%m-%dT%H:%i:%s'),'%Y-%m-%d') when '掌读' then from_unixtime(order_time, '%Y-%m-%d') ELSE order_time end date, stage,platform,channel,channel_id,user_id, case when platform='掌中云' then DATE_FORMAT(STR_TO_DATE(order_time,'%Y-%m-%dT%H:%i:%s'),'%Y-%m-%d %H:%i:%s') when platform='掌读' then from_unixtime(order_time, '%Y-%m-%d %H:%i:%s') ELSE order_time end order_time, case when platform='掌中云' then DATE_FORMAT(STR_TO_DATE(reg_time,'%Y-%m-%dT%H:%i:%s'),'%Y-%m-%d %H:%i:%s') when platform='掌读' then from_unixtime(reg_time, '%Y-%m-%d %H:%i:%s') ELSE reg_time end reg_time, amount,from_novel,order_id,2 from `order` where date=UNIX_TIMESTAMP('{}') """.format(dt) db.quchen_text.execute(sql) def order_account_text(): db.quchen_text.execute("truncate order_account_text") with open('./wending_account_config.csv', encoding='utf-8') as f: for i in f.readlines(): db.quchen_text.execute("insert into order_account_text(platform,text) values ('文鼎','{}')".format(i)) if __name__ == '__main__': # channel_info_daily('2021-02-06') # dw_daily_channel_cost('2021-10-11') # exit() # channel_by_account_daily('2021-02-05') for i in dt.getDateLists('2021-09-08', '2021-11-13'): print(i) channel_by_account_daily(i) channel_info_daily(i) dw_daily_channel_cost(i) # ods_order('2021-05-06') platform_data_sum(i)