sync_to_ck_task.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. from model.DataBaseUtils import MysqlUtils,CkUtils
  2. from model.DateUtils import DateUtils
  3. from model.log import logger
  4. log=logger()
  5. db = MysqlUtils()
  6. ck = CkUtils()
  7. dt = DateUtils()
  8. from datetime import datetime
  9. def order(ymd):
  10. log.debug("sync order")
  11. col = "date,stage,platform,channel,channel_id,user_id,order_time,reg_time,amount,from_novel,order_id"
  12. sql ="select * from `order` where date=UNIX_TIMESTAMP('{}') ".format(ymd)
  13. data = db.quchen_text.getData(sql)
  14. # 掌中云时间格式转换
  15. data1 = []
  16. for x in data:
  17. li = list(x)
  18. li[6] = li[6].replace('T',' ').replace('+08:00','').replace('/','-')
  19. if li[6] == None or li[6] == '':
  20. li[6] = '0000-00-00 00:00:00'
  21. if li[7]==None or li[7]=='':
  22. li[7]='0000-00-00 00:00:00'
  23. else:
  24. li[7] = li[7].replace('T',' ').replace('+08:00','').replace('/','-')
  25. try:
  26. li[6]=str(datetime.strptime(li[6],'%Y-%m-%d %H:%M'))
  27. li[7]=str(datetime.strptime(li[7],'%Y-%m-%d %H:%M'))
  28. # li[7]=str(datetime.strptime(li[7],'%Y/%m/%d %H:%M:%S'))
  29. except:
  30. pass
  31. # print("======================")
  32. # try:
  33. # li[6] = str(datetime.strptime(li[6][:17], '%Y-%m-%d %H:%M'))
  34. # li[7] = str(datetime.strptime(li[7][:17], '%Y/%m/%d %H:%M'))
  35. # except:
  36. # pass
  37. # li[6]='2020-11-01 00:00:00'
  38. # li[7]='2020-11-01 00:00:00'
  39. li[6] =li[6][:19]
  40. li[7] =li[7][:19]
  41. li[9] = '' if li[9] is None else li[9]
  42. li[5] = '' if li[5] is None else li[5]
  43. li[1] = '' if li[1] is None else li[1]
  44. data1.append(tuple(li))
  45. # 删除分区
  46. ck.execute("alter table order drop partition '{}' ".format(ymd))
  47. ck.insertMany("order", col, tuple(data1))
  48. def daily_qq(mydt):
  49. log.debug("sync daily_qq")
  50. table='daily_qq'
  51. col=db.quchen_text.getColumn("daily_qq")
  52. sql = "select * from {} where date='{} 00:00:00'".format(table,mydt)
  53. data = db.quchen_text.getData(sql)
  54. li1 = []
  55. for i in data:
  56. li2 = list(i)
  57. li2[1] = str(li2[1])
  58. li2[12]=round(i[12],4)
  59. li1.append(tuple(li2))
  60. ck.execute("alter table {} drop partition '{}' ".format(table, mydt))
  61. ck.insertMany(table, col, li1)
  62. def daily_vx(mydt):
  63. log.debug("sync daily_vx")
  64. table='daily_vx'
  65. col=db.quchen_text.getColumn(table)
  66. sql = "select * from {} where date='{} 00:00:00'".format(table,mydt)
  67. data = db.quchen_text.getData(sql)
  68. # print(len(data))
  69. li1 = []
  70. for i in data:
  71. li2 = list(i)
  72. li2[1] = str(li2[1])
  73. li2[5] = round(i[5],4)
  74. li2[6] = round(i[6],4)
  75. li2[7] = round(i[7],4)
  76. li2[8] = round(i[8],4)
  77. li2[10] = round(i[10],4)
  78. li1.append(tuple(li2))
  79. ck.execute("alter table {} drop partition '{}' ".format(table, mydt))
  80. ck.insertMany(table, col, li1)
  81. def daily_vx_campaign(ymd):
  82. table = 'daily_vx_campaign'
  83. sql = 'select * from ' + table + ' where date="{} 00:00:00" '.format(ymd)
  84. data = db.quchen_text.getData(sql)
  85. data1 = []
  86. for i in data:
  87. li = list(i)
  88. li[1] = str(li[1])
  89. li[5] = round(li[5], 4)
  90. li[6] = round(li[6], 4)
  91. li[7] = round(li[7], 4)
  92. li[8] = round(li[8], 4)
  93. li[12] = round(li[12], 4)
  94. li[13] = round(li[13], 4)
  95. li[15] = round(li[15], 4)
  96. data1.append(tuple(li))
  97. col = db.quchen_text.getColumn("daily_vx_campaign")
  98. ck.execute("alter table order drop partition '{}' ".format(ymd))
  99. ck.insertMany(table, col, tuple(data1))