sync_to_ck_task.py 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. from model.DataBaseUtils import MysqlUtils,CkUtils
  2. from model.DateUtils import DateUtils
  3. from model.log import logger
  4. log=logger()
  5. db = MysqlUtils()
  6. ck = CkUtils()
  7. dt = DateUtils()
  8. def order_sync_ck(ymd):
  9. log.debug("sync order")
  10. col = "date,stage,platform,channel,channel_id,user_id,order_time,reg_time,amount,from_novel,order_id"
  11. sql ="select * from ods_order where date='{}'".format(ymd)
  12. data = db.quchen_text.getData(sql)
  13. data1 = []
  14. for x in data:
  15. li = list(x)
  16. li[0]=str(li[0])
  17. li[5]='' if li[5]==None else li[5]
  18. li[6]=str(li[6])
  19. li[7]='0000-00-00 00:00:00' if li[7]==None else str(li[7])
  20. li[9]='' if li[9]==None else li[9]
  21. data1.append(tuple(li))
  22. # 删除分区
  23. ck.execute("alter table order drop partition '{}' ".format(ymd))
  24. ck.insertMany("order", col, tuple(data1))
  25. # 广告计划
  26. def daily_vx_campaign(ymd):
  27. table = 'daily_vx_campaign'
  28. sql = 'select * from ' + table + ' where date="{} 00:00:00" '.format(ymd)
  29. data = db.quchen_text.getData(sql)
  30. data1 = []
  31. for i in data:
  32. li = list(i)
  33. li[1] = str(li[1])
  34. li[5] = round(li[5], 4)
  35. li[6] = round(li[6], 4)
  36. li[7] = round(li[7], 4)
  37. li[8] = round(li[8], 4)
  38. li[12] = round(li[12], 4)
  39. li[13] = round(li[13], 4)
  40. li[15] = round(li[15], 4)
  41. data1.append(tuple(li))
  42. col = db.quchen_text.getColumn("daily_vx_campaign")
  43. ck.execute("alter table order drop partition '{}' ".format(ymd))
  44. ck.insertMany(table, col, tuple(data1))
  45. def adcreative():
  46. """todo:表需要重新设计 无法分区"""
  47. pass
  48. def campaign_vx():
  49. """todo:表需要重新设计 无法分区"""
  50. # table="campaign_vx"
  51. # sql="select * from {}".format(table)
  52. # data=db.quchen_text.getData(sql)
  53. pass
  54. def dw_order_channel_sync_ck(ymd):
  55. table='dw_daily_channel'
  56. sql = f"select * from {table} where dt='{ymd}'"
  57. col=ck.getColumns(f"{table}",str=True)
  58. data = db.quchen_text.get_data_list(sql)
  59. data1=[]
  60. for i in data:
  61. i[0]=str(i[0])
  62. i[8]=str(i[8])
  63. i[11]=str(i[11])
  64. i[15]=str(i[15])
  65. i[-1]='' if i[-1]==None else i[-1]
  66. data1.append(tuple(i))
  67. ck.execute(f"alter table {table} drop partition '{ymd}' ")
  68. ck.insertMany(f"{table}", col, tuple(data1))
  69. if __name__ == '__main__':
  70. dw_order_channel_sync_ck(dt.get_n_days(-2))
  71. # for i in dt.getDateLists('2019-03-18','2020-12-17'):
  72. # # order(i)
  73. # dw_order_channel_sync_ck(i)