sync_to_ck_hourly.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. """同步到 ck 小时调度"""
  2. from model.DataBaseUtils import MysqlUtils,CkUtils
  3. from model.DateUtils import DateUtils
  4. from model.log import logger
  5. log=logger()
  6. db = MysqlUtils()
  7. ck = CkUtils()
  8. dt = DateUtils()
  9. import sys
  10. from model.DingTalkUtils import DingTalkUtils
  11. from datetime import datetime
  12. def order(ymd):
  13. log.debug("sync order")
  14. col = "date,stage,platform,channel,channel_id,user_id,order_time,reg_time,amount,from_novel,order_id"
  15. sql ="select * from `order` where date=UNIX_TIMESTAMP('{}') ".format(ymd)
  16. data = db.quchen_text.getData(sql)
  17. # 掌中云时间格式转换
  18. data1 = []
  19. for x in data:
  20. li = list(x)
  21. li[6] = li[6].replace('T',' ').replace('+08:00','').replace('/','-')
  22. if li[6] == None or li[6] == '':
  23. li[6] = '0000-00-00 00:00:00'
  24. if li[7]==None or li[7]=='':
  25. li[7]='0000-00-00 00:00:00'
  26. else:
  27. li[7] = li[7].replace('T',' ').replace('+08:00','').replace('/','-')
  28. try:
  29. li[6]=str(datetime.strptime(li[6],'%Y-%m-%d %H:%M'))
  30. li[7]=str(datetime.strptime(li[7],'%Y-%m-%d %H:%M'))
  31. # li[7]=str(datetime.strptime(li[7],'%Y/%m/%d %H:%M:%S'))
  32. except:
  33. pass
  34. # print("======================")
  35. # try:
  36. # li[6] = str(datetime.strptime(li[6][:17], '%Y-%m-%d %H:%M'))
  37. # li[7] = str(datetime.strptime(li[7][:17], '%Y/%m/%d %H:%M'))
  38. # except:
  39. # pass
  40. # li[6]='2020-11-01 00:00:00'
  41. # li[7]='2020-11-01 00:00:00'
  42. li[6] =li[6][:19]
  43. li[7] =li[7][:19]
  44. li[9] = '' if li[9] is None else li[9]
  45. li[5] = '' if li[5] is None else li[5]
  46. li[1] = '' if li[1] is None else li[1]
  47. data1.append(tuple(li))
  48. # 删除分区
  49. ck.execute("alter table order drop partition '{}' ".format(ymd))
  50. ck.insertMany("order", col, tuple(data1))
  51. def daily_qq(mydt):
  52. log.debug("sync daily_qq")
  53. table='daily_qq'
  54. col=db.quchen_text.getColumn("daily_qq")
  55. sql = "select * from {} where date='{} 00:00:00'".format(table,mydt)
  56. data = db.quchen_text.getData(sql)
  57. li1 = []
  58. for i in data:
  59. li2 = list(i)
  60. li2[1] = str(li2[1])
  61. li2[12]=round(i[12],4)
  62. li1.append(tuple(li2))
  63. ck.execute("alter table {} drop partition '{}' ".format(table, mydt))
  64. ck.insertMany(table, col, li1)
  65. def daily_vx(mydt):
  66. log.debug("sync daily_vx")
  67. table='daily_vx'
  68. col=db.quchen_text.getColumn(table)
  69. sql = "select * from {} where date='{} 00:00:00'".format(table,mydt)
  70. data = db.quchen_text.getData(sql)
  71. # print(len(data))
  72. li1 = []
  73. for i in data:
  74. li2 = list(i)
  75. li2[1] = str(li2[1])
  76. li2[5] = round(i[5],4)
  77. li2[6] = round(i[6],4)
  78. li2[7] = round(i[7],4)
  79. li2[8] = round(i[8],4)
  80. li2[10] = round(i[10],4)
  81. li1.append(tuple(li2))
  82. ck.execute("alter table {} drop partition '{}' ".format(table, mydt))
  83. ck.insertMany(table, col, li1)
  84. def run(ymd):
  85. log.debug("run "+ymd)
  86. try:
  87. order(ymd)
  88. except:
  89. DingTalkUtils.send("[order]同步到ck出错:" + e, '18860455786')
  90. try:
  91. daily_qq(ymd)
  92. except:
  93. DingTalkUtils.send("[daily_qq]到ck出错:" + e, '18860455786,')
  94. try:
  95. daily_vx(ymd)
  96. except:
  97. DingTalkUtils.send("[daily_vx]同步到ck出错:" + e, '18860455786,')
  98. if __name__ == '__main__':
  99. if sys.argv.__len__()>1:
  100. run(dt.get_n_days(-1))
  101. today=dt.get_n_days()
  102. run(today)
  103. try:
  104. run(today)
  105. except Exception as e:
  106. DingTalkUtils.send("订单同步到ck出错:"+e,'18860455786,')