| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215 | from model.DataBaseUtils import MysqlUtils,CkUtilsfrom model.DateUtils import DateUtilsfrom model.log import loggerlog=logger()db = MysqlUtils()ck = CkUtils()dt = DateUtils()from datetime import datetimefrom sync_to_ck_task import dw_order_channel_cost_sync_ckdef dw_daily_channel_cost(ymd):    sql="""replace into dw_daily_channel_cost        select x.dt,x.channel,pitcher,stage,x.platform,x.book,               ifnull(view_count,0),ifnull(click_count,0),ifnull(follow_user,0),ifnull(cost,0)/100 as cost,               ifnull(web_view_count,0) web_view_count,               ifnull(platform_view_count,0) platform_view_count,               ifnull(web_order_count,0) web_order_count,               ifnull(type,''),ifnull(require_roi,0),ifnull(require_mult,0)                 from        (select dt,channel,stage,pitcher,platform,book from channel_info_daily where dt='{0}' and channel!='') x          left join        (select channel,sum(cost) as cost,sum(view_count) as view_count,sum(valid_click_count) as click_count,sum(from_follow_uv)  as follow_user,         sum(web_view_count) as web_view_count,sum(platform_view_count) as platform_view_count,sum(web_order_count) as web_order_count         from            (select account_id,cost,view_count,valid_click_count,round(valid_click_count*official_account_follow_rate,0) as from_follow_uv,            0 as  web_view_count,            0 as platform_view_count,            0 as web_order_count             from daily_vx where date='{0} 00:00:00'            union            select account_id,cost,view_count,valid_click_count,from_follow_uv,            ifnull(web_commodity_page_view_count,0) as  web_view_count,             ifnull(platform_page_view_count,0) as platform_view_count,            ifnull(web_order_count,0) as web_order_count             from daily_qq where date='{0} 00:00:00') a        left join        (select account_id,channel from channel_by_account_daily where dt='{0}') b on a.account_id=b.account_id  group by channel)         z on x.channel=z.channel         left join (SELECT channel,type from channel_by_account_daily GROUP By channel,type) k on x.channel=k.channel        left join (        select dt,book,platform,require_roi,require_mult from ods_book_info_daily         )m on x.book=m.book and x.platform=m.platform and x.dt=m.dt        """.format(ymd)    print(sql)    db.quchen_text.execute(sql)def channel_by_account_daily(ymd):    """返回当天消耗账户对应的公众号表"""    sql="""replace into channel_by_account_daily             select  '{0}' as dt,a.account_id as account_id, ifnull(ifnull(b.name,a.name),'') as channel,type  from            (select  account_id,name,'qq' as type from advertiser_qq              union              select account_id,name,'vx' as type from advertiser_vx             ) a                left join            (select b.account_id,b.name from            (select min(end_time) as end_time,account_id from account_change where end_time>'{0}' GROUP BY account_id) a            left join account_change  b on  a.end_time=b.end_time and a.account_id=b.account_id) b on a.account_id=b.account_id""".format(ymd)    print(sql)    db.quchen_text.execute(sql)def channel_info_daily(ymd):    """获取公众号某天的期数,投手,平台,书籍    @ return [[]]    """    # 获取现在的全量公众号信息    sql="""select '{}' as dt,a.name ,ifnull(stage,''),ifnull(pitcher,''),ifnull(platform,''),ifnull(book,'') from (            select  name from advertiser_vx  where name is not null group by name--  公众号全量表            union            select  name from advertiser_qq where name is not null group by name            union            select  name from account_change group by name            union            select channel as name   from pitcher_change group by channel            union            select  name from platform_change group by name            union            select  name from book_change group by name) a left join (            select name,ifnull(stage,'') stage,ifnull(pitcher,'') pitcher,ifnull(platform,'') platform,ifnull(book,'') book from advertiser_qq where name is not null  group by name,stage,pitcher,platform,book             union                select name,ifnull(stage,'') stage,ifnull(pitcher,'') pitcher,ifnull(platform,'') platform,ifnull(book,'') book from advertiser_vx where name is not null and name !=''                ) b on a.name=b.name                """.format(ymd)    data=db.quchen_text.get_data_list(sql)    pitcher_change=db.quchen_text.getData(                                          "select b.channel as channel,pitcher from "                                          "(select max(start_time) as start_time,channel from pitcher_change "                                     " where start_time<='{}' GROUP BY channel) a"                                     " left join pitcher_change  b on  a.start_time=b.start_time and a.channel=b.channel".format(ymd))    platform_change=db.quchen_text.getData("select b.name as channel,current_platform as platform from (select max(change_date) as change_date,name from platform_change "                                          "where change_date<='{}' GROUP BY name) a "                                          "left join platform_change  b on  a.change_date=b.change_date and a.name=b.name".format(ymd))    book_change=db.quchen_text.getData("select b.name as channel,book from (select max(start_time) as start_time,name from book_change "                                           "where start_time<='{}' GROUP BY name) a "                                           "left join book_change  b on  a.start_time=b.start_time and a.name=b.name".format(ymd))    for i in data:        for j in pitcher_change:            if i[1]==j[0]:                i[3]=j[1]        for k in platform_change:            if i[1]==k[0]:                i[4]=k[1]        for h in book_change:            if i[1]==h[0]:                i[5]=h[1]    print(data)    insert_sql="replace into channel_info_daily values (%s,%s,%s,%s,%s,%s) "    db.quchen_text.executeMany(insert_sql,data)def ods_order(dt):    sql="""	replace into ods_order			select 			case platform when '掌中云' then DATE_FORMAT(STR_TO_DATE(order_time,'%Y-%m-%dT%H:%i:%s'),'%Y-%m-%d')             when '掌读' then from_unixtime(order_time, '%Y-%m-%d')             ELSE order_time end  date,            stage,platform,channel,channel_id,user_id,            case when platform='掌中云' then DATE_FORMAT(STR_TO_DATE(order_time,'%Y-%m-%dT%H:%i:%s'),'%Y-%m-%d %H:%i:%s')             when platform='掌读' then from_unixtime(order_time, '%Y-%m-%d %H:%i:%s')             ELSE order_time end order_time,            case when platform='掌中云' then DATE_FORMAT(STR_TO_DATE(reg_time,'%Y-%m-%dT%H:%i:%s'),'%Y-%m-%d %H:%i:%s')             when platform='掌读' then from_unixtime(reg_time, '%Y-%m-%d %H:%i:%s')             ELSE reg_time end reg_time,			amount,from_novel,order_id from `order` where date=UNIX_TIMESTAMP('{}')			""".format(dt)    db.quchen_text.execute(sql)def order_account_text():    db.quchen_text.execute("truncate order_account_text")    with open('./wending_account_config.csv',encoding='utf-8') as f:        for i in f.readlines():            db.quchen_text.execute("insert into order_account_text(platform,text) values ('文鼎','{}')".format(i))def   dw_channel_daily_total(ymd):    sql=f"""insert into dw_channel_daily_total            select '{ymd}' dt,channel,total_cost,total_amount,total_first_amount from            (select channel,sum(cost) total_cost  from dw_daily_channel_cost where  dt<='{ymd}'  group by channel)a            left outer join            (select  channel,sum(amount) total_amount,            sum(if(toDate(formatDateTime(reg_time,'%Y-%m-%d'))=date,amount,0)) total_first_amount            from order where date<='{ymd}' group by channel) b             on a.channel=b.channel                 """    ck.execute(f"alter table dw_channel_daily_total drop partition '{ymd}'")    print(sql)    ck.execute(sql)if __name__ == '__main__':    for i in dt.getDateLists('2019-03-18','2021-01-27'):        channel_by_account_daily(i)        channel_info_daily(i)    # dw_daily_pitcher('2021-01-14')    # dw_daily_channel_cost('2021-01-28')    exit(0)    # dw_channel_daily_total('2020-07-20')    # channel_by_account_daily('2020-12-17')    # dw_daily_channel_cost('2020-12-17')    # dw_order_channel_cost_sync_ck('2020-12-17')    # exit(0)    # ods_order('2020-12-20')    # dw_daily_channel_plus()    # exit()    # dw_daily_channel()    # exit(0)    # dm_pitcher_daily_page_total()    # dm_pitcher_daily_page_total()    # exit(0)    # dw_channel_daily_total('2021-01-11')    # dw_daily_channel_cost('2021-01-12')    # dw_channel_daily_total('2021-01-13')    # exit(0)    # dw_daily_channel()    # # exit(0)    # for i in dt.getDateLists('2019-03-18','2021-01-14'):    #     print(i)        # dw_channel_daily_total(i)        # dw_daily_pitcher(i)        # channel_by_account_daily(i)    #     dw_daily_channel_cost(i)    #     dw_order_channel_cost_sync_ck(i)    # dw_daily_channel()    #     order_sync_ck(today)    #     # ods_order(i)        # channel_info_daily(i)        # dw_daily_channel_cost(i)        # dw_order_channel_cost_sync_ck(i)        # dw_channel_daily_total(i)
 |