sync_to_ck_task.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. from model.DataBaseUtils import MysqlUtils,CkUtils
  2. from model.DateUtils import DateUtils
  3. from model.log import logger
  4. log=logger()
  5. db = MysqlUtils()
  6. ck = CkUtils()
  7. dt = DateUtils()
  8. from datetime import datetime
  9. def order(ymd):
  10. log.debug("sync order")
  11. col = "date,stage,platform,channel,channel_id,user_id,order_time,reg_time,amount,from_novel,order_id"
  12. sql ="select * from ods_order where date='{}'".format(ymd)
  13. data = db.quchen_text.getData(sql)
  14. # # 掌中云时间格式转换
  15. data1 = []
  16. for x in data:
  17. li = list(x)
  18. li[0]=str(li[0])
  19. li[6]=str(li[6])
  20. li[7]=str(li[7])
  21. li[9]='' if li[9]==None else li[9]
  22. data1.append(tuple(li))
  23. # 删除分区
  24. ck.execute("alter table order drop partition '{}' ".format(ymd))
  25. ck.insertMany("order", col, tuple(data1))
  26. def daily_qq(mydt):
  27. log.debug("sync daily_qq")
  28. table='daily_qq'
  29. col=db.quchen_text.getColumn("daily_qq")
  30. sql = "select * from {} where date='{} 00:00:00'".format(table,mydt)
  31. data = db.quchen_text.getData(sql)
  32. li1 = []
  33. for i in data:
  34. li2 = list(i)
  35. li2[1] = str(li2[1])
  36. li2[12]=round(i[12],4)
  37. li1.append(tuple(li2))
  38. ck.execute("alter table {} drop partition '{}' ".format(table, mydt))
  39. ck.insertMany(table, col, li1)
  40. def daily_vx(mydt):
  41. log.debug("sync daily_vx")
  42. table='daily_vx'
  43. col=db.quchen_text.getColumn(table)
  44. sql = "select * from {} where date='{} 00:00:00'".format(table,mydt)
  45. data = db.quchen_text.getData(sql)
  46. # print(len(data))
  47. li1 = []
  48. for i in data:
  49. li2 = list(i)
  50. li2[1] = str(li2[1])
  51. li2[5] = round(i[5],4)
  52. li2[6] = round(i[6],4)
  53. li2[7] = round(i[7],4)
  54. li2[8] = round(i[8],4)
  55. li2[10] = round(i[10],4)
  56. li1.append(tuple(li2))
  57. ck.execute("alter table {} drop partition '{}' ".format(table, mydt))
  58. ck.insertMany(table, col, li1)
  59. # 广告计划
  60. def daily_vx_campaign(ymd):
  61. table = 'daily_vx_campaign'
  62. sql = 'select * from ' + table + ' where date="{} 00:00:00" '.format(ymd)
  63. data = db.quchen_text.getData(sql)
  64. data1 = []
  65. for i in data:
  66. li = list(i)
  67. li[1] = str(li[1])
  68. li[5] = round(li[5], 4)
  69. li[6] = round(li[6], 4)
  70. li[7] = round(li[7], 4)
  71. li[8] = round(li[8], 4)
  72. li[12] = round(li[12], 4)
  73. li[13] = round(li[13], 4)
  74. li[15] = round(li[15], 4)
  75. data1.append(tuple(li))
  76. col = db.quchen_text.getColumn("daily_vx_campaign")
  77. ck.execute("alter table order drop partition '{}' ".format(ymd))
  78. ck.insertMany(table, col, tuple(data1))
  79. def advertiser_vx_qq():
  80. sql = "select account_id,name,stage,pitcher,platform,book,'vx' as tag from advertiser_vx " \
  81. " union " \
  82. "select account_id,name,stage,pitcher,platform,book,'qq' as tag from advertiser_qq"
  83. data = db.quchen_text.getData(sql)
  84. data1 = []
  85. for i in data:
  86. li = list(i)
  87. for idx, val in enumerate(li):
  88. if val == None:
  89. li[idx] = ''
  90. data1.append(tuple(li))
  91. ck.execute("truncate table advertiser_vx_qq")
  92. ck.execute("insert into advertiser_vx_qq (account_id,name,stage,pitcher,platform,book,tag) values " + str(data1)[1:-1])
  93. def pitcher_change():
  94. sql="select channel,pitcher,ifnull(start_time,'') as start_time," \
  95. " ifnull(end_time,'') as end_time from pitcher_change"
  96. data=db.quchen_text.getData(sql)
  97. ck.execute("truncate table pitcher")
  98. ck.execute("insert into pitcher_change values {}".format(str(data)[1:-1]))
  99. def book_change():
  100. table="book_change"
  101. col="name,book,start_time,end_time"
  102. sql="select name,book,ifnull(start_time,'') as start_time,ifnull(end_time,'') as end_time from {}".format(table)
  103. data=db.quchen_text.getData(sql)
  104. ck.execute("truncate table {}".format(table))
  105. ck.insertMany(table,col,data)
  106. def account_change():
  107. table = "account_change"
  108. col = "account_id,name,pitcher,start_time,end_time"
  109. sql = "select account_id,name,pitcher,ifnull(start_time,'') as start_time,ifnull(end_time,'') as end_time from {}".format(table)
  110. data = db.quchen_text.getData(sql)
  111. ck.execute("truncate table {}".format(table))
  112. ck.insertMany(table, col, data)
  113. def platform_change():
  114. table = "platform_change"
  115. col = "name,primary_platform,current_platform,change_date"
  116. sql = "select name,primary_platform,current_platform,date_format(change_date,'%Y-%m-%d') as change_date from {}".format(table)
  117. data = db.quchen_text.getData(sql)
  118. ck.execute("truncate table {}".format(table))
  119. ck.insertMany(table, col, data)
  120. def adcreative():
  121. """todo:表需要重新设计 无法分区"""
  122. pass
  123. def campaign_vx():
  124. """todo:表需要重新设计 无法分区"""
  125. # table="campaign_vx"
  126. # sql="select * from {}".format(table)
  127. # data=db.quchen_text.getData(sql)
  128. pass
  129. if __name__ == '__main__':
  130. for i in dt.getDateLists('2019-05-20','2020-12-27'):
  131. order(i)