Explorar el Código

同步任务优化

ck hace 4 años
padre
commit
516097b044
Se han modificado 3 ficheros con 140 adiciones y 128 borrados
  1. 5 21
      app/etl/sync_to_ck_daily.py
  2. 9 107
      app/etl/sync_to_ck_hourly.py
  3. 126 0
      app/etl/sync_to_ck_task.py

+ 5 - 21
app/etl/sync_to_ck_daily.py

@@ -9,30 +9,14 @@ log=logger()
 db = MysqlUtils()
 ck = CkUtils()
 dt = DateUtils()
-
-def sync_campaign(ymd):
-    table = 'daily_vx_campaign'
-    sql = 'select * from ' + table + ' where date="{} 00:00:00" '.format(ymd)
-    data = db.quchen_text.getData(sql)
-    data1 = []
-    for i in data:
-        li = list(i)
-        li[1] = str(li[1])
-        li[5] = round(li[5], 4)
-        li[6] = round(li[6], 4)
-        li[7] = round(li[7], 4)
-        li[8] = round(li[8], 4)
-        li[12] = round(li[12], 4)
-        li[13] = round(li[13], 4)
-        li[15] = round(li[15], 4)
-        data1.append(tuple(li))
-    col = db.quchen_text.getColumn("daily_vx_campaign")
-    ck.execute("alter table order drop  partition '{}' ".format(ymd))
-    ck.insertMany(table, col, tuple(data1))
+from sync_to_ck_task import *
 
 
 def run(ymd):
-    sync_campaign(ymd)
+    daily_vx_campaign(ymd)
+    order(ymd)
+    daily_vx(ymd)
+    daily_qq(ymd)
 
 
 if __name__ == '__main__':

+ 9 - 107
app/etl/sync_to_ck_hourly.py

@@ -10,125 +10,27 @@ dt = DateUtils()
 import sys
 from model.DingTalkUtils import DingTalkUtils
 from datetime import datetime
+from sync_to_ck_task import *
 
 
-def order(ymd):
-    log.debug("sync order")
-    col = "date,stage,platform,channel,channel_id,user_id,order_time,reg_time,amount,from_novel,order_id"
+def run():
 
-    sql ="select * from `order` where date=UNIX_TIMESTAMP('{}') ".format(ymd)
-
-    data = db.quchen_text.getData(sql)
-
-    # 掌中云时间格式转换
-    data1 = []
-    for x in data:
-        li = list(x)
-
-        li[6] = li[6].replace('T',' ').replace('+08:00','').replace('/','-')
-
-        if li[6] == None or li[6] == '':
-            li[6] = '0000-00-00 00:00:00'
-
-        if li[7]==None or li[7]=='':
-            li[7]='0000-00-00 00:00:00'
-        else:
-            li[7] = li[7].replace('T',' ').replace('+08:00','').replace('/','-')
-
-        try:
-            li[6]=str(datetime.strptime(li[6],'%Y-%m-%d %H:%M'))
-            li[7]=str(datetime.strptime(li[7],'%Y-%m-%d %H:%M'))
-            # li[7]=str(datetime.strptime(li[7],'%Y/%m/%d %H:%M:%S'))
-        except:
-            pass
-            # print("======================")
-        #     try:
-        #         li[6] = str(datetime.strptime(li[6][:17], '%Y-%m-%d %H:%M'))
-        #         li[7] = str(datetime.strptime(li[7][:17], '%Y/%m/%d %H:%M'))
-        #     except:
-        #         pass
-                # li[6]='2020-11-01 00:00:00'
-                # li[7]='2020-11-01 00:00:00'
-
-        li[6] =li[6][:19]
-        li[7] =li[7][:19]
-
-        li[9] = '' if li[9] is None else li[9]
-        li[5] = '' if li[5] is None else li[5]
-        li[1] = '' if li[1] is None else li[1]
-
-        data1.append(tuple(li))
-
-    # 删除分区
-    ck.execute("alter table order drop  partition '{}' ".format(ymd))
-    ck.insertMany("order", col, tuple(data1))
-
-
-def daily_qq(mydt):
-    log.debug("sync daily_qq")
-    table='daily_qq'
-    col=db.quchen_text.getColumn("daily_qq")
-    sql = "select * from {} where date='{} 00:00:00'".format(table,mydt)
-    data = db.quchen_text.getData(sql)
-    li1 = []
-
-    for i in data:
-        li2 = list(i)
-        li2[1] = str(li2[1])
-        li2[12]=round(i[12],4)
-        li1.append(tuple(li2))
-    ck.execute("alter table {} drop  partition '{}' ".format(table, mydt))
-    ck.insertMany(table, col, li1)
-
-
-def daily_vx(mydt):
-    log.debug("sync daily_vx")
-    table='daily_vx'
-    col=db.quchen_text.getColumn(table)
-    sql = "select * from {} where date='{} 00:00:00'".format(table,mydt)
-    data = db.quchen_text.getData(sql)
-    # print(len(data))
-    li1 = []
-
-
-    for i in data:
-        li2 = list(i)
-        li2[1] = str(li2[1])
-        li2[5] = round(i[5],4)
-        li2[6] = round(i[6],4)
-        li2[7] = round(i[7],4)
-        li2[8] = round(i[8],4)
-        li2[10] = round(i[10],4)
-        li1.append(tuple(li2))
-
-    ck.execute("alter table {} drop  partition '{}' ".format(table, mydt))
-    ck.insertMany(table, col, li1)
-
-
-def run(ymd):
-    log.debug("run  "+ymd)
     try:
         order(ymd)
     except:
-        DingTalkUtils.send("[order]同步到ck出错:" + e, '18860455786')
+        DingTalkUtils.send("[order]同步到ck出错:",'18860455786')
     try:
         daily_qq(ymd)
     except:
-        DingTalkUtils.send("[daily_qq]到ck出错:" + e, '18860455786,')
+        DingTalkUtils.send("[daily_qq]到ck出错:", '18860455786,')
     try:
         daily_vx(ymd)
     except:
-        DingTalkUtils.send("[daily_vx]同步到ck出错:" + e, '18860455786,')
-
+        DingTalkUtils.send("[daily_vx]同步到ck出错", '18860455786,')
 
-if __name__ == '__main__':
 
-    if sys.argv.__len__()>1:
-        run(dt.get_n_days(-1))
 
-    today=dt.get_n_days()
-    run(today)
-    try:
-        run(today)
-    except Exception as e:
-        DingTalkUtils.send("订单同步到ck出错:"+e,'18860455786,')
+if __name__ == '__main__':
+    ymd = dt.get_n_days()
+    log.debug("run  " + ymd)
+    run()

+ 126 - 0
app/etl/sync_to_ck_task.py

@@ -0,0 +1,126 @@
+
+from model.DataBaseUtils import MysqlUtils,CkUtils
+from model.DateUtils import DateUtils
+from model.log import logger
+log=logger()
+db = MysqlUtils()
+ck = CkUtils()
+dt = DateUtils()
+from datetime import datetime
+
+
+def order(ymd):
+    log.debug("sync order")
+    col = "date,stage,platform,channel,channel_id,user_id,order_time,reg_time,amount,from_novel,order_id"
+
+    sql ="select * from `order` where date=UNIX_TIMESTAMP('{}') ".format(ymd)
+
+    data = db.quchen_text.getData(sql)
+
+    # 掌中云时间格式转换
+    data1 = []
+    for x in data:
+        li = list(x)
+
+        li[6] = li[6].replace('T',' ').replace('+08:00','').replace('/','-')
+
+        if li[6] == None or li[6] == '':
+            li[6] = '0000-00-00 00:00:00'
+
+        if li[7]==None or li[7]=='':
+            li[7]='0000-00-00 00:00:00'
+        else:
+            li[7] = li[7].replace('T',' ').replace('+08:00','').replace('/','-')
+
+        try:
+            li[6]=str(datetime.strptime(li[6],'%Y-%m-%d %H:%M'))
+            li[7]=str(datetime.strptime(li[7],'%Y-%m-%d %H:%M'))
+            # li[7]=str(datetime.strptime(li[7],'%Y/%m/%d %H:%M:%S'))
+        except:
+            pass
+            # print("======================")
+        #     try:
+        #         li[6] = str(datetime.strptime(li[6][:17], '%Y-%m-%d %H:%M'))
+        #         li[7] = str(datetime.strptime(li[7][:17], '%Y/%m/%d %H:%M'))
+        #     except:
+        #         pass
+                # li[6]='2020-11-01 00:00:00'
+                # li[7]='2020-11-01 00:00:00'
+
+        li[6] =li[6][:19]
+        li[7] =li[7][:19]
+
+        li[9] = '' if li[9] is None else li[9]
+        li[5] = '' if li[5] is None else li[5]
+        li[1] = '' if li[1] is None else li[1]
+
+        data1.append(tuple(li))
+
+    # 删除分区
+    ck.execute("alter table order drop  partition '{}' ".format(ymd))
+    ck.insertMany("order", col, tuple(data1))
+
+
+def daily_qq(mydt):
+    log.debug("sync daily_qq")
+    table='daily_qq'
+    col=db.quchen_text.getColumn("daily_qq")
+    sql = "select * from {} where date='{} 00:00:00'".format(table,mydt)
+    data = db.quchen_text.getData(sql)
+    li1 = []
+
+    for i in data:
+        li2 = list(i)
+        li2[1] = str(li2[1])
+        li2[12]=round(i[12],4)
+        li1.append(tuple(li2))
+    ck.execute("alter table {} drop  partition '{}' ".format(table, mydt))
+    ck.insertMany(table, col, li1)
+
+
+def daily_vx(mydt):
+    log.debug("sync daily_vx")
+    table='daily_vx'
+    col=db.quchen_text.getColumn(table)
+    sql = "select * from {} where date='{} 00:00:00'".format(table,mydt)
+    data = db.quchen_text.getData(sql)
+    # print(len(data))
+    li1 = []
+
+
+    for i in data:
+        li2 = list(i)
+        li2[1] = str(li2[1])
+        li2[5] = round(i[5],4)
+        li2[6] = round(i[6],4)
+        li2[7] = round(i[7],4)
+        li2[8] = round(i[8],4)
+        li2[10] = round(i[10],4)
+        li1.append(tuple(li2))
+
+    ck.execute("alter table {} drop  partition '{}' ".format(table, mydt))
+    ck.insertMany(table, col, li1)
+
+
+def daily_vx_campaign(ymd):
+    table = 'daily_vx_campaign'
+    sql = 'select * from ' + table + ' where date="{} 00:00:00" '.format(ymd)
+    data = db.quchen_text.getData(sql)
+    data1 = []
+    for i in data:
+        li = list(i)
+        li[1] = str(li[1])
+        li[5] = round(li[5], 4)
+        li[6] = round(li[6], 4)
+        li[7] = round(li[7], 4)
+        li[8] = round(li[8], 4)
+        li[12] = round(li[12], 4)
+        li[13] = round(li[13], 4)
+        li[15] = round(li[15], 4)
+        data1.append(tuple(li))
+    col = db.quchen_text.getColumn("daily_vx_campaign")
+    ck.execute("alter table order drop  partition '{}' ".format(ymd))
+    ck.insertMany(table, col, tuple(data1))
+
+
+