user_action.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. from wechat_action.sql_tools import save_human_info, delete_wechat_info, save_wechat_info
  2. from wechat_api.get_wechat_info import WechatApi
  3. from wechat_action.create_ad_layout import CreateAd
  4. from wechat_action.create_ad_plan import CreateAdPlan
  5. from wechat_action import sql_tools
  6. from sqlalchemy import Table
  7. from communication_tools import dingtalk
  8. import json
  9. import logging
  10. from datetime import datetime
  11. from wechat_action.login_ad import LogIn
  12. # 定位为:接受请求之后对应的线程处理
  13. layout_create_action = 'create_ad_layout'
  14. ad_plan_create_action = 'create_ad_plan'
  15. refresh_wechat_action = 'refresh_wechat_info'
  16. def carry_plan(user_id, ad_plan_list, log_ad, db, cookie_canuse, task_name):
  17. # TODO:失败后 ,状态为:创建中 ----进行对应删除
  18. # 创建中的数据需要进行删除
  19. sql_session = db.DBSession()
  20. try:
  21. action_record_table = Table('action_record', db.metadata,
  22. autoload=True, autoload_with=db.engine)
  23. # cookies保存
  24. if not cookie_canuse:
  25. log_ad.cookies_save(log_ad)
  26. for _ in ad_plan_list:
  27. service_name = _['service_name']
  28. wechat_name = _['wechat_name']
  29. plan_name = _['title']
  30. try:
  31. # 1.检查1.落地页是否创建过了
  32. log_ad.select_ad_master(service_name, wechat_name)
  33. # 现在默认layout_name在30个字符以内
  34. layout_name = _['idea']['jump_type_page_type']['layout_name']
  35. if CreateAd.check_sucess_api(layout_name=layout_name, log_ad=log_ad):
  36. res = {'sucess': True, 'result_info': '已经创建过对应落地页'}
  37. sql_tools.action_record(res, sql_session, layout_create_action, user_id, layout_name,
  38. action_record_table,
  39. service_name, wechat_name, task_name)
  40. else:
  41. # 1.5无则创建落地页
  42. try:
  43. create_ad_layout = CreateAd(login_ad=log_ad, service_name=service_name, wechat_name=wechat_name)
  44. layout_typesetting_dict = sql_tools.get_layout_typesetting(sql_session, user_id,
  45. typesetting_name=layout_name)
  46. print(layout_typesetting_dict)
  47. layout_typesetting_dict = json.loads(layout_typesetting_dict)
  48. res = create_ad_layout.create_layout(layout_typesetting_dict)
  49. sql_tools.action_record(res, sql_session, layout_create_action, user_id, layout_name,
  50. action_record_table,
  51. service_name, wechat_name, task_name)
  52. except Exception as e:
  53. res = {'sucess': False, 'result_info': str(e)}
  54. sql_tools.action_record(res, sql_session, layout_create_action, user_id, plan_name,
  55. action_record_table,
  56. service_name, wechat_name, task_name)
  57. continue
  58. log_ad.refresh_driver()
  59. # 3.创建计划
  60. log_ad.select_ad_master(service_name, wechat_name)
  61. # plan_typesetting_dict = sql_tools.get_ad_plan_typesetting(sql_session=sql_session, user_id=user_id,
  62. # typesetting_name=plan_name)
  63. create_ad_plan = CreateAdPlan(login_ad=log_ad, task=_, service_name=service_name,
  64. wechat_name=wechat_name)
  65. res = create_ad_plan.run()
  66. # 4.更新action_record相关计划信息
  67. sql_tools.action_record(res, sql_session, ad_plan_create_action, user_id, plan_name,
  68. action_record_table,
  69. service_name, wechat_name, task_name)
  70. logging.info('创建计划任务结束')
  71. log_ad.refresh_driver()
  72. except Exception as e:
  73. log_ad.driver.save_screenshot(
  74. 'user_id:{}_time_{}_plan_name:{}_plan_error.png'.format(user_id, datetime.now().strftime(
  75. "%Y-%m-%d, %H:%M:%S"), plan_name))
  76. logging.error(str(e))
  77. log_ad.refresh_driver()
  78. res = {'sucess': False, 'result_info': str(e)}
  79. sql_tools.action_record(res, sql_session, ad_plan_create_action, user_id, plan_name,
  80. action_record_table,
  81. service_name, wechat_name, task_name)
  82. raise
  83. # 每次运行微信相关操作,对微信相关信息进行刷新
  84. get_human_info(user_id, log_ad, db, cookie_canuse, task_name)
  85. check_task_in_hand(user_id, db, log_ad)
  86. except Exception as e:
  87. sql_tools.update_task_status_error(sql_session=sql_session, user_id=user_id, task_name=task_name)
  88. dingtalk.send_message('user_id:{} 计划执行出错,进行检查\n{}'.format(user_id, str(e)))
  89. logging.error(e)
  90. raise
  91. finally:
  92. try:
  93. print('任务结束')
  94. log_ad.driver.quit()
  95. except:
  96. pass
  97. def get_human_info(user_id, log_ad, db, cookie_canuse, task_name):
  98. # 数据库
  99. action_record_table = Table('action_record', db.metadata,
  100. autoload=True, autoload_with=db.engine)
  101. human_info_table = Table('human_info', db.metadata,
  102. autoload=True, autoload_with=db.engine)
  103. wechat_info_table = Table('wechat_info', db.metadata,
  104. autoload=True, autoload_with=db.engine)
  105. sql_session = db.DBSession()
  106. # 行为记录对应参数
  107. object_name = ''
  108. service_name = ''
  109. wechat_name = ''
  110. try:
  111. # 1.cookies保存
  112. if not cookie_canuse:
  113. log_ad.cookies_save(log_ad)
  114. # wechat_info.每次都删除掉前面全部数据,进行更新
  115. # human_info 进行全局更新
  116. w_api = WechatApi(log_ad=log_ad)
  117. res_info = w_api.get_human_info()
  118. if not res_info['sucess']:
  119. dingtalk.send_message('user_id:{} 获取微信数据为空,进行检查'.format(user_id))
  120. log_ad.driver.close()
  121. sql_tools.action_record(res_info, sql_session, refresh_wechat_action, user_id,
  122. object_name, action_record_table,
  123. service_name, wechat_name, task_name)
  124. return
  125. # wechat info进行数据更新
  126. # 1.删除所有数据
  127. delete_wechat_info(sql_session=sql_session, user_id=user_id)
  128. # 2.重新添加一遍相关数据
  129. for _ in res_info['result_list']:
  130. service_name = _['service_name']
  131. wechat_name = _['wechat_name']
  132. appid = _['appid']
  133. wxname = _['wxname']
  134. wechat_info = {'appid': appid, 'wxname': wxname, 'service_name': service_name,
  135. 'wechat_name': wechat_name,
  136. 'user_id': user_id}
  137. wechat_insert = save_wechat_info(wechat_info=wechat_info,
  138. table_wechat=wechat_info_table)
  139. sql_session.execute(wechat_insert)
  140. sql_session.commit()
  141. logging.info('update wechat info')
  142. # human info 相关数据进行更新
  143. for _ in res_info['result_list']:
  144. service_name = _['service_name']
  145. wechat_name = _['wechat_name']
  146. human_info = _['data']['list']
  147. update_res = human_info_table.update() \
  148. .where(human_info_table.c.service_name == service_name) \
  149. .where(human_info_table.c.wechat_name == wechat_name) \
  150. .values(human_info=human_info)
  151. update_res = sql_session.execute(update_res)
  152. sql_session.commit()
  153. if update_res.rowcount == 0:
  154. human_info = {'service_name': service_name, 'wechat_name': wechat_name, 'human_info': human_info}
  155. human_insert = save_human_info(human_info=human_info,
  156. table_human=human_info_table)
  157. sql_session.execute(human_insert)
  158. sql_session.commit()
  159. logging.info('update human info')
  160. # 浏览器关闭
  161. log_ad.driver.close()
  162. sql_tools.action_record(res_info, sql_session, refresh_wechat_action,
  163. user_id, object_name, action_record_table,
  164. service_name, wechat_name, task_name)
  165. check_task_in_hand(user_id=user_id, db=db, log_ad=log_ad)
  166. except Exception as e:
  167. res_info = {'sucess': False, 'result_info': str(e)}
  168. sql_tools.action_record(res_info, sql_session, refresh_wechat_action,
  169. user_id, object_name, action_record_table,
  170. service_name, wechat_name, task_name)
  171. log_ad.driver.save_screenshot(
  172. 'user_id:{}_time_{}_wechat_info_error.png'.format(user_id, datetime.now().strftime("%Y-%m-%d, %H:%M:%S")))
  173. dingtalk.send_message('user_id:{} 获取微信数据出错,进行检查\n{}'.format(user_id, str(e)))
  174. logging.error(e)
  175. raise
  176. finally:
  177. try:
  178. print('任务结束')
  179. log_ad.driver.quit()
  180. except:
  181. pass
  182. def check_task_in_hand(user_id, db, log_ad):
  183. sql_session = db.DBSession()
  184. lines = sql_tools.get_task_in_hand_limit_one(user_id=user_id, sql_session=sql_session)
  185. task_name = ''
  186. ad_plan_list = []
  187. action_type = ''
  188. for _ in lines:
  189. typesetting, wechat_name, service_name, action_type, task_name = _
  190. ad_plan_list.append(json.loads(typesetting))
  191. if action_type == refresh_wechat_action:
  192. # 刷新log_ad,以免log_ad超过生命周期
  193. log_ad.driver.quit()
  194. log_ad = LogIn(user_id=user_id)
  195. get_human_info(user_id, log_ad, db, True, task_name)
  196. if action_type == ad_plan_create_action:
  197. log_ad.driver.quit()
  198. log_ad = LogIn(user_id=user_id)
  199. carry_plan(user_id, ad_plan_list, log_ad, db, True, task_name)
  200. if __name__ == "__main__":
  201. pass