get_order.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. from app.api_data.order_util import *
  2. from model.DataBaseUtils import MysqlUtils
  3. from concurrent.futures import ThreadPoolExecutor
  4. from model.DingTalkUtils import DingTalkUtils
  5. db=MysqlUtils()
  6. def get_account(plactform):
  7. data = db.quchen_text.getData(f"select text from order_account_text where platform='{plactform}'")
  8. new_data = []
  9. for i in data:
  10. new_data.append(i[0].replace('\n', '').split(","))
  11. return new_data
  12. def get_new_account(plactform):
  13. data = db.quchen_text.getData(f"select text from order_account_text where platform='{plactform}' and create_time>='{du.get_n_days(-1)}'")
  14. new_data = []
  15. for i in data:
  16. new_data.append(i[0].replace('\n', '').split(","))
  17. return new_data
  18. def yangguang(start=None, end=None,new=None):
  19. if start:
  20. start = start+' 00:00:00'
  21. end = end+' 23:59:59'
  22. else:
  23. start = du.getTodayOrYestoday() + ' 00:00:00'
  24. end = du.get_n_hours_ago(0)
  25. client_id = 10008097
  26. token = '2xa1d55tTPBjeEA8Ho'
  27. accounts=get_account("阳光") if new is None else get_new_account('阳光')
  28. if accounts.__len__()==0:
  29. return
  30. else:
  31. print(f"阳光账号数:{accounts.__len__()}")
  32. for i in accounts:
  33. stage = i[0]
  34. vip_id = i[1]
  35. print(vip_id)
  36. get_yg_vip_channel(stage, vip_id, client_id, token)
  37. get_yg_data(stage, vip_id, client_id, token, start, end)
  38. x=1
  39. while True:
  40. a = db.quchen_text.getOne("select count(1) from yangguang_path where update_time is null")
  41. print(f" vip 待处理数量 {a} 正在等待数据回调")
  42. if a == 0:
  43. break
  44. time.sleep(60)
  45. x+=1
  46. if x>15:
  47. DingTalkUtils.send('阳光订单回调延时15min','18860455786')
  48. break
  49. for i in accounts:
  50. vip_id = i[1]
  51. stage=i[0]
  52. parse_yg_data(vip_id,stage)
  53. def huasheng(start=None,end=None,new=None):
  54. if start is None:
  55. start = end = du.getTodayOrYestoday()
  56. executor = ThreadPoolExecutor(max_workers=5)
  57. accounts = get_account("花生") if new is None else get_new_account('花生')
  58. if len(accounts)==0:
  59. return
  60. else:
  61. print(f'花生有账号{len(accounts)}个')
  62. li = []
  63. for account in accounts:
  64. channel_data = get_hs_channel(account)
  65. if not channel_data:
  66. continue
  67. else:
  68. print(f"账号:{account[2]} 有channel{len(channel_data)}个")
  69. for merchant in channel_data:
  70. executor.submit(get_huasheng_order_task, start, end, account, merchant,li)
  71. executor.shutdown(True)
  72. save_hs_data(li)
  73. def hourly():
  74. yangguang()
  75. huasheng()
  76. def daily():
  77. st = du.get_n_days(-10)
  78. et = du.get_n_days(-1)
  79. yangguang(et, et)
  80. huasheng(et, et)
  81. # 昨日新增账户 自动往前跑10天数据
  82. yangguang(st, et, new=True)
  83. huasheng(st, et, new=True)
  84. if __name__ == '__main__':
  85. yangguang()
  86. huasheng()
  87. """要是只跑一个账号 把 get_yg_acccount() 里面的sql where 条件加上 id=xxx"""