from model.DateUtils import DateUtils import logging from model.DingTalkUtils import DingTalkUtils from app.etl.data_stat_task import * from app.etl.sync_to_ck_task import * from app.etl.dm.dm_pitcher_daily_overview import dm_pitcher_daily_overview from app.etl.dw.dw_channel_daily import dw_channel_daily from app.etl.dw.dw_pitcher_daily import dw_pitcher_trend from app.etl.src.src_book_info import src_book_info from app.etl.dw.dw_book_trend import book_trend from app.etl.src import book_annual_expect_profit from logging import handlers du = DateUtils() logger = logging.getLogger("") # logger.setLevel(logging.ERROR) # logging.getLogger().setLevel(logging.WARNING) def do_order(st, et): logging.info('订单数据同步到ck,开始') for i in du.getDateLists(st, et): logging.info('订单:' + str(i)) order_sync_ck(i) logging.info('订单数据同步到ck,结束') def do_cost(st, et): logging.info('消耗数据处理,开始') for i in du.getDateLists(st, et): logging.info("消耗:" + str(i)) channel_by_account_daily(i) channel_info_daily(i) dw_daily_channel_cost(i) # dw_daily_bytedance_cost(i) platform_data_sum(i) logging.info('消耗数据处理,结束') def main(st, et): try: do_order(st, et) do_cost(st, et) # src_book_info() # 书籍卡点信息 # book_annual_expect_profit.run() # 年预期收益 dw_channel_daily() dw_pitcher_trend() book_trend() dm_pitcher_daily_overview() except Exception as e: logging.error(e) DingTalkUtils().send("hourlyRun fail!! " + str(e), '15168342316') def hourly(): thedate = du.getTodayOrYestoday() main(thedate, thedate) #TODO:特殊---因为创建角色数量一直在变,故需要每次都跑所有数据------之后引进flink-cdc后则不需要 #TODO:暂时让近30天的创建角色正确 for i in du.getDateLists(du.get_n_days(-30), du.get_n_days(-1)): dw_daily_channel_cost(i) platform_data_sum(i) def daily(): "往前跑10天" st = du.get_n_days(-10) et = du.get_n_days(-1) do_order(st, et) do_cost(st, et) #TODO:特殊---因为创建角色数量一直在变,故需要每次都跑所有数据------之后引进flink-cdc后则不需要 #TODO:暂时让近30天的创建角色正确 for i in du.getDateLists(du.get_n_days(-30), et): dw_daily_channel_cost(i) platform_data_sum(i) if __name__ == '__main__': logging.basicConfig( handlers=[ logging.handlers.RotatingFileHandler('./log/data_stat_run.log', maxBytes=10 * 1024 * 1024, backupCount=5, encoding='utf-8') , logging.StreamHandler() # 供输出使用 ], level=logging.INFO, format="%(asctime)s - %(levelname)s %(filename)s %(funcName)s %(lineno)s - %(message)s" ) main(du.get_n_days(-200), du.getNow()) # hourly() # daily() # daily('2021-06-30','2021-07-04') # do_order('2021-05-30', '2021-06-29')