"""同步到 ck 小时调度""" from model.DataBaseUtils import MysqlUtils,CkUtils from model.DateUtils import DateUtils from model.log import logger log=logger() db = MysqlUtils() ck = CkUtils() dt = DateUtils() import sys from model.DingTalkUtils import DingTalkUtils from datetime import datetime def order(ymd): log.debug("sync order") col = "date,stage,platform,channel,channel_id,user_id,order_time,reg_time,amount,from_novel,order_id" sql ="select * from `order` where date=UNIX_TIMESTAMP('{}') ".format(ymd) data = db.quchen_text.getData(sql) # 掌中云时间格式转换 data1 = [] for x in data: li = list(x) li[6] = li[6].replace('T',' ').replace('+08:00','').replace('/','-') if li[6] == None or li[6] == '': li[6] = '0000-00-00 00:00:00' if li[7]==None or li[7]=='': li[7]='0000-00-00 00:00:00' else: li[7] = li[7].replace('T',' ').replace('+08:00','').replace('/','-') try: li[6]=str(datetime.strptime(li[6],'%Y-%m-%d %H:%M')) li[7]=str(datetime.strptime(li[7],'%Y-%m-%d %H:%M')) # li[7]=str(datetime.strptime(li[7],'%Y/%m/%d %H:%M:%S')) except: pass # print("======================") # try: # li[6] = str(datetime.strptime(li[6][:17], '%Y-%m-%d %H:%M')) # li[7] = str(datetime.strptime(li[7][:17], '%Y/%m/%d %H:%M')) # except: # pass # li[6]='2020-11-01 00:00:00' # li[7]='2020-11-01 00:00:00' li[6] =li[6][:19] li[7] =li[7][:19] li[9] = '' if li[9] is None else li[9] li[5] = '' if li[5] is None else li[5] li[1] = '' if li[1] is None else li[1] data1.append(tuple(li)) # 删除分区 ck.execute("alter table order drop partition '{}' ".format(ymd)) ck.insertMany("order", col, tuple(data1)) def daily_qq(mydt): log.debug("sync daily_qq") table='daily_qq' col=db.quchen_text.getColumn("daily_qq") sql = "select * from {} where date='{} 00:00:00'".format(table,mydt) data = db.quchen_text.getData(sql) li1 = [] for i in data: li2 = list(i) li2[1] = str(li2[1]) li2[12]=round(i[12],4) li1.append(tuple(li2)) ck.execute("alter table {} drop partition '{}' ".format(table, mydt)) ck.insertMany(table, col, li1) def daily_vx(mydt): log.debug("sync daily_vx") table='daily_vx' col=db.quchen_text.getColumn(table) sql = "select * from {} where date='{} 00:00:00'".format(table,mydt) data = db.quchen_text.getData(sql) # print(len(data)) li1 = [] for i in data: li2 = list(i) li2[1] = str(li2[1]) li2[5] = round(i[5],4) li2[6] = round(i[6],4) li2[7] = round(i[7],4) li2[8] = round(i[8],4) li2[10] = round(i[10],4) li1.append(tuple(li2)) ck.execute("alter table {} drop partition '{}' ".format(table, mydt)) ck.insertMany(table, col, li1) def run(ymd): log.debug("run "+ymd) try: order(ymd) except: DingTalkUtils.send("[order]同步到ck出错:" + e, '18860455786') try: daily_qq(ymd) except: DingTalkUtils.send("[daily_qq]到ck出错:" + e, '18860455786,') try: daily_vx(ymd) except: DingTalkUtils.send("[daily_vx]同步到ck出错:" + e, '18860455786,') if __name__ == '__main__': if sys.argv.__len__()>1: run(dt.get_n_days(-1)) today=dt.get_n_days() run(today) try: run(today) except Exception as e: DingTalkUtils.send("订单同步到ck出错:"+e,'18860455786,')