tornado_api.py 24 KB


  1. from wechat_action.sql_models import DB
  2. from settings import using_config
  3. import tornado.log
  4. import tornado.ioloop
  5. import tornado.web
  6. import json
  7. import time
  8. from wechat_api.get_wechat_info import WechatApi
  9. from logging import handlers
  10. from datetime import datetime
  11. from wechat_action.login_ad import LogIn
  12. from wechat_action import sql_tools
  13. import threading
  14. from web_module import user_action
  15. from sqlalchemy import Table
  16. import json
  17. import pickle
  18. # TODO:需要添加上supervisor,来维护进程
  19. # TODO:有时间需要对tornado进行改进
  20. # TODO:需要有一套上线工具,来维持线上稳定
  21. db = DB(config=using_config)
  22. wechat_cookies_table = Table('wechat_cookies', db.metadata,
  23. autoload=True, autoload_with=db.engine)
  24. layout_typesetting_table = Table('layout_typesetting', db.metadata,
  25. autoload=True, autoload_with=db.engine)
  26. ad_plan_typesetting_table = Table('ad_plan_typesetting', db.metadata,
  27. autoload=True, autoload_with=db.engine)
  28. action_record_table = Table('action_record', db.metadata,
  29. autoload=True, autoload_with=db.engine)
  30. # 1.实现本机服务
  31. # 2.实现线上docker-selenium服务
  32. class BaseHandler(tornado.web.RequestHandler):
  33. def options(self):
  34. pass
  35. def set_default_headers(self):
  36. self.set_header('Access-Control-Allow-Origin', '*')
  37. self.set_header('Access-Control-Allow-Headers', '*')
  38. self.set_header('Access-Control-Max-Age', 1000)
  39. self.set_header('Content-type', '*')
  40. self.set_header('Access-Control-Allow-Methods', '*')
  41. class create_ad_plan_local(BaseHandler):
  42. def post(self):
  43. request_dict = json.loads(self.request.body, encoding='utf-8')
  44. user_id = request_dict['user_id']
  45. ad_plan_list = request_dict['plan_list']
  46. print(user_id, ad_plan_list)
  47. sql_session = db.DBSession()
  48. if user_id is None or ad_plan_list is None:
  49. self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
  50. return
  51. # 落地页名字精确到毫秒,默认是全局唯一
  52. for _ in ad_plan_list:
  53. ad_plan_name = _['title']
  54. ad_plan_typesetting_info = {'user_id': user_id, 'name': ad_plan_name,
  55. 'typesetting': json.dumps(_, ensure_ascii=False)}
  56. ad_plan_typesetting_inserte = sql_tools.save_ad_plan_typesetting_info(
  57. ad_plan_typesetting_info=ad_plan_typesetting_info,
  58. table_ad_plan_typesetting=ad_plan_typesetting_table)
  59. sql_session.execute(ad_plan_typesetting_inserte)
  60. sql_session.commit()
  61. self.write({'status': {'msg': 'success', "RetCode": 200}})
  62. class create_ad_plan(BaseHandler):
  63. # TODO:只要tornado开着就不允许修改数据库,------想好之后上线如何操作
  64. # TODO:需要与刷新用户cookie相关action 联动,---------正在刷新用户信息时,不能进行计划创建.反之同理
  65. def post(self):
  66. request_dict = json.loads(self.request.body, encoding='utf-8')
  67. print(request_dict)
  68. user_id = request_dict['user_id']
  69. ad_plan_list = request_dict['plan_list']
  70. # 1.查看是否cookie可用
  71. log_ad, cookie_canuse = ad_human_info.refresh_wechat_cookies(self, user_id=user_id)
  72. # 2.数据存入数据库
  73. print(user_id, ad_plan_list)
  74. sql_session = db.DBSession()
  75. if user_id is None or ad_plan_list is None:
  76. self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
  77. return
  78. # 2.1存计划数据
  79. for _ in ad_plan_list:
  80. ad_plan_name = _['title'].replace(' ', '')[:29]
  81. ad_plan_typesetting_info = {'user_id': user_id, 'name': ad_plan_name,
  82. 'typesetting': json.dumps(_, ensure_ascii=False)}
  83. print('typesetting_info')
  84. print(_)
  85. print(ad_plan_typesetting_info)
  86. ad_plan_typesetting_inserte = sql_tools.save_ad_plan_typesetting_info(
  87. ad_plan_typesetting_info=ad_plan_typesetting_info,
  88. table_ad_plan_typesetting=ad_plan_typesetting_table)
  89. sql_session.execute(ad_plan_typesetting_inserte)
  90. # 2.2存行为历史记录
  91. task_name = '用户: {user_id} 时间:{time_sign}'.format(user_id=user_id,
  92. time_sign=datetime.now().strftime("%Y-%m-%d, %H:%M:%S"))
  93. for _ in ad_plan_list:
  94. # 1.查看历史中有无对应落地页
  95. # TODO:落地页直接存放到action_record,运行过程中进行不同公众号进行不同操作
  96. # 2.历史记录
  97. print(_)
  98. for action_type in ['create_ad_plan', 'create_ad_layout']:
  99. object_name = _['title'] if action_type == 'create_ad_plan' else _['idea']['jump_type_page_type'][
  100. 'layout_name']
  101. object_name = object_name.replace(' ', '')[:29]
  102. action_info = {'user_id': user_id, 'service_name': _['service_name'], 'wechat_name': _['wechat_name'],
  103. 'action_type': action_type, 'object_name': object_name, 'task_name': task_name,
  104. 'status': 'todo'}
  105. record_insert = sql_tools.save_action_record(action_record_info=action_info,
  106. table_action_record=action_record_table)
  107. sql_session.execute(record_insert)
  108. sql_session.commit()
  109. # 4.开始运行
  110. threading.Thread(target=user_action.carry_plan,
  111. args=(user_id, ad_plan_list, log_ad, db, cookie_canuse)).start()
  112. class get_ad_plan_local(BaseHandler):
  113. def get(self):
  114. user_id = self.get_argument('user_id', None)
  115. layout_name = self.get_argument('plan_name', None)
  116. sql_session = db.DBSession()
  117. if user_id is None:
  118. self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
  119. return
  120. # 落地页名字精确到毫秒,默认是全局唯一
  121. if layout_name:
  122. result = sql_tools.get_plan_typesetting_rough(sql_session=sql_session, user_id=user_id,
  123. typesetting_name=layout_name)
  124. else:
  125. # TODO:之后修改一下,让其查询效率高点,like效率过低
  126. layout_name = ''
  127. result = sql_tools.get_plan_typesetting_rough(sql_session=sql_session, user_id=user_id,
  128. typesetting_name=layout_name)
  129. print(result)
  130. result_ = []
  131. for i in range(len(result)):
  132. print(result[i])
  133. typesetting, name, create_time, update_time = result[i]
  134. _ = {}
  135. _['typesetting'] = json.loads(typesetting)
  136. _['ad_plan_name'] = name
  137. _['id'] = i
  138. _['create_time'] = create_time.strftime("%Y-%m-%d %H:%M:%S")
  139. _['update_time'] = update_time.strftime("%Y-%m-%d %H:%M:%S")
  140. result_.append(_)
  141. self.write({'status': {'msg': 'success', "RetCode": 200},
  142. 'local_ad_plan_info': result_})
  143. class create_ad_layout_remote(BaseHandler):
  144. # 1.批量创建落地页
  145. def post(self):
  146. user_id = self.get_argument("user_id", None)
  147. layout_name = self.get_argument("layout_name", None)
  148. # wechat_json :[{'service_name':'one','wechat_name':''},{'service_name':'','wechat_name':''}]
  149. wechat_json = self.get_argument('wechat_json', None)
  150. log_ad, cookie_canuse = ad_human_info.refresh_wechat_cookies(self, user_id=user_id)
  151. threading.Thread(target=user_action.create_layout,
  152. args=(user_id, layout_name, wechat_json, log_ad, db, cookie_canuse)).start()
  153. class create_ad_layout_local(BaseHandler):
  154. def post(self):
  155. # TODO:返回一个layout_name重复的一个信息
  156. request_dict = json.loads(self.request.body)
  157. user_id = request_dict['user_id']
  158. layout_typesetting = request_dict['layout_typesetting']
  159. layout_name = request_dict['layout_name']
  160. print(user_id, layout_typesetting, layout_name)
  161. print('layout-typesetting', type(layout_typesetting), layout_typesetting)
  162. sql_session = db.DBSession()
  163. if user_id is None or layout_name is None or layout_typesetting is None:
  164. self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
  165. return
  166. # 落地页名字精确到毫秒,默认是全局唯一
  167. layout_typesetting_info = {'user_id': user_id, 'name': layout_name,
  168. 'typesetting': layout_typesetting}
  169. layout_typesetting_inserte = sql_tools.save_layout_typesetting_info(
  170. layout_typesetting_info=layout_typesetting_info,
  171. table_layout_typesetting=layout_typesetting_table)
  172. sql_session.execute(layout_typesetting_inserte)
  173. sql_session.commit()
  174. self.write({'status': {'msg': 'success', "RetCode": 200}})
  175. class get_ad_layout_local(BaseHandler):
  176. def get(self):
  177. user_id = self.get_argument('user_id', None)
  178. layout_name = self.get_argument('layout_name', None)
  179. sql_session = db.DBSession()
  180. if user_id is None:
  181. self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
  182. return
  183. # 落地页名字精确到毫秒,默认是全局唯一
  184. if layout_name:
  185. result = sql_tools.get_layout_typesetting_rough(sql_session=sql_session, user_id=user_id,
  186. typesetting_name=layout_name)
  187. else:
  188. # TODO:之后修改一下,让其查询效率高点,like效率过低
  189. layout_name = ''
  190. result = sql_tools.get_layout_typesetting_rough(sql_session=sql_session, user_id=user_id,
  191. typesetting_name=layout_name)
  192. print(result)
  193. result_ = []
  194. for i in range(len(result)):
  195. print(result[i])
  196. typesetting, name, create_time, update_time = result[i]
  197. _ = {}
  198. _['typesetting'] = json.loads(typesetting)
  199. _['layout_name'] = name
  200. _['id'] = i
  201. _['create_time'] = create_time.strftime("%Y-%m-%d %H:%M:%S")
  202. _['update_time'] = update_time.strftime("%Y-%m-%d %H:%M:%S")
  203. result_.append(_)
  204. self.write({'status': {'msg': 'success', "RetCode": 200},
  205. 'local_layout_info': result_})
  206. # TODO:wechat_info,human_info 这两张表有空时需要进行对应改进
  207. # TODO:ad_human_info ,ad_wecaht_info 两个的行为需要与create_ad_plan 进行交互
  208. class ad_human_info(BaseHandler):
  209. # TODO:设置一下update---table,如果失败了sql_session需要关闭
  210. @staticmethod
  211. def refresh_wechat_cookies(tornado_web, user_id):
  212. # TODO:添加互动接口,添加状态字段,打开selenium就变换
  213. # 1.返回二维码链接
  214. # ----1.查看cookie是否可用
  215. sql_session = db.DBSession()
  216. cookie_db = sql_tools.get_wechat_cookies(sql_session, user_id=user_id)
  217. # 进行登录操作
  218. log_ad = LogIn(user_id=user_id)
  219. # 使driver可以使用
  220. cookie_canuse = False
  221. if cookie_db:
  222. cookie_db = pickle.loads(cookie_db)
  223. if not log_ad.wechat_cookies_check_alive(cookie_db):
  224. # cookie 不能使用
  225. wechat_code = log_ad.log_in()
  226. tornado_web.write({'status': {'msg': 'success', "RetCode": 200},
  227. 'wechat_code': wechat_code})
  228. print('cookie can not use')
  229. else:
  230. # cookie 可以继续使用
  231. cookie_canuse = True
  232. log_ad.driver.get('https://a.weixin.qq.com/index.html')
  233. tornado_web.write({'status': {'msg': 'success', "RetCode": 200}})
  234. else:
  235. # cookie 不能使用
  236. wechat_code = log_ad.log_in()
  237. tornado_web.write({'status': {'msg': 'success', "RetCode": 200},
  238. 'wechat_code': wechat_code})
  239. return log_ad, cookie_canuse
  240. # 1.人群包获取
  241. def get(self):
  242. try:
  243. # TODO:添加分页
  244. # 0.是否刷新
  245. # 1.获取userid,以及是否刷新
  246. user_id = self.get_argument("user_id", None)
  247. human_package_name = self.get_argument('human_package_name', None)
  248. is_refresh = self.get_argument("is_refresh", None)
  249. wechat_name = self.get_argument('wechat_name', None)
  250. service_name = self.get_argument('service_name', None)
  251. print(user_id, is_refresh)
  252. if user_id is None or is_refresh is None or wechat_name is None or service_name is None:
  253. self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
  254. return
  255. sql_session = db.DBSession()
  256. # TODO:一个涉及到selenium-driver的请求-生命周期.----看一下tornado是怎么处理请求的生命周期
  257. if int(is_refresh) == 1:
  258. log_ad, cookie_canuse = self.refresh_wechat_cookies(self, user_id=user_id)
  259. threading.Thread(target=user_action.get_human_info,
  260. args=(
  261. user_id, log_ad, db, cookie_canuse)).start()
  262. else:
  263. # 1.查看是否在刷新,
  264. # 在刷新中,
  265. # 返回正在刷新
  266. # -------不管上面逻辑让他们多刷新几次
  267. # 不在刷新
  268. # 返回对应数据
  269. # 2.获取userid对应数据
  270. result = sql_tools.get_human_info(sql_session=sql_session,
  271. service_name=service_name, wechat_name=wechat_name)
  272. print(result)
  273. result = json.loads(result)
  274. if human_package_name:
  275. result = [_ for _ in result if human_package_name in _['name']]
  276. result_ = []
  277. for i in range(len(result)):
  278. _ = result[i]
  279. _['id'] = i
  280. result_.append(_)
  281. self.write({'status': {'msg': 'success', "RetCode": 200},
  282. 'human_info': result})
  283. except Exception as e:
  284. logging.error(str(e))
  285. try:
  286. sql_session.commit()
  287. except:
  288. pass
  289. class ad_wechat_info(BaseHandler):
  290. # 1.公众号相关信息获取
  291. def get(self):
  292. # TODO:添加分页,
  293. # 公众号,服务商,唯一id设计或者获取
  294. # 0.是否刷新
  295. # 1.获取userid,以及是否刷新
  296. user_id = self.get_argument("user_id", None)
  297. is_refresh = self.get_argument("is_refresh", None)
  298. print(user_id, is_refresh)
  299. if user_id is None or is_refresh is None:
  300. self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
  301. return
  302. sql_session = db.DBSession()
  303. # TODO:一个涉及到selenium-driver的请求-生命周期.----看一下tornado是怎么处理请求的生命周期
  304. if int(is_refresh) == 1:
  305. log_ad, cookie_canuse = ad_human_info.refresh_wechat_cookies(self, user_id=user_id)
  306. threading.Thread(target=user_action.get_human_info,
  307. args=(
  308. user_id, log_ad, db, cookie_canuse)).start()
  309. else:
  310. # 1.查看是否在刷新,
  311. # 在刷新中,
  312. # 返回正在刷新
  313. # -------不管上面逻辑让他们多刷新几次
  314. # 不在刷新
  315. # 返回对应数据
  316. # 2.获取userid对应数据
  317. result = sql_tools.get_wechat_info(sql_session=sql_session, user_id=user_id)
  318. result_list = []
  319. for _ in result:
  320. service_name, wechat_name = _
  321. result_list.append({'service_name': service_name, 'wechat_name': wechat_name})
  322. print(result_list)
  323. self.write({'status': {'msg': 'success', "RetCode": 200},
  324. 'wechat_info': result_list})
  325. class delete_ad_layout(BaseHandler):
  326. def get(self):
  327. user_id = self.get_argument('user_id', None)
  328. layout_name = self.get_argument('layout_name', None)
  329. sql_session = db.DBSession()
  330. if user_id is None or layout_name is None:
  331. self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
  332. return
  333. # 落地页名字精确到毫秒,默认是全局唯一
  334. sql_tools.delete_layout_typesetting_vir(sql_session=sql_session, user_id=user_id,
  335. typesetting_name=layout_name)
  336. self.write({'status': {'msg': 'success', "RetCode": 200}})
  337. class delete_ad_plan(BaseHandler):
  338. def get(self):
  339. user_id = self.get_argument('user_id', None)
  340. plan_name = self.get_argument('plan_name', None)
  341. service_name = self.get_argument('service_name', None)
  342. wechat_name = self.get_argument('wechat_name', None)
  343. sql_session = db.DBSession()
  344. if user_id is None or plan_name is None:
  345. self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
  346. return
  347. # 落地页名字精确到毫秒,默认是全局唯一
  348. sql_tools.delete_ad_plan_typesetting_vir(sql_session=sql_session, user_id=user_id,
  349. typesetting_name=plan_name, wechat_name=wechat_name,
  350. service_name=service_name)
  351. self.write({'status': {'msg': 'success', "RetCode": 200}})
  352. class get_ad_wechat_service_name(BaseHandler):
  353. def get(self):
  354. user_id = self.get_argument('user_id', None)
  355. sql_session = db.DBSession()
  356. if user_id is None:
  357. self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
  358. return
  359. result = sql_tools.get_wechat_info_service_name(sql_session=sql_session, user_id=user_id)
  360. result_list = []
  361. for _ in result:
  362. service_name = _
  363. result_list.append({'service_name': service_name})
  364. print(result_list)
  365. self.write({'status': {'msg': 'success', "RetCode": 200},
  366. 'wechat_info': result_list})
  367. class get_ad_wechat_wechat_name(BaseHandler):
  368. def get(self):
  369. user_id = self.get_argument('user_id', None)
  370. service_name = self.get_argument('service_name', None)
  371. sql_session = db.DBSession()
  372. if user_id is None or service_name is None:
  373. self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
  374. return
  375. result = sql_tools.get_wechat_info_wechat_name(sql_session=sql_session, user_id=user_id,
  376. service_name=service_name)
  377. result_list = []
  378. for _ in result:
  379. service_name, wechat_name = _
  380. result_list.append({'service_name': service_name, 'wechat_name': wechat_name})
  381. print(result_list)
  382. self.write({'status': {'msg': 'success', "RetCode": 200},
  383. 'wechat_info': result_list})
  384. class get_plan_action_record(BaseHandler):
  385. def get(self):
  386. user_id = self.get_argument('user_id', None)
  387. service_name = self.get_argument('service_name', None)
  388. wechat_name = self.get_argument('wechat_name', None)
  389. status = self.get_argument('status', None)
  390. plan_name = self.get_argument('plan_name', None)
  391. sql_session = db.DBSession()
  392. if user_id is None:
  393. self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
  394. return
  395. # 落地页名字精确到毫秒,默认是全局唯一
  396. result = sql_tools.get_plan_record(sql_session=sql_session, user_id=user_id,
  397. service_name=service_name, wechat_name=wechat_name,
  398. status=status, plan_name=plan_name)
  399. result_ = []
  400. for i in range(len(result)):
  401. print(result[i])
  402. user_id, name, service_name, wechat_name, create_time, status, typesetting, wechat_id_info = result[i]
  403. _ = {}
  404. _['typesetting'] = json.loads(typesetting)
  405. _['ad_plan_name'] = name
  406. _['id'] = i
  407. _['create_time'] = create_time.strftime("%Y-%m-%d %H:%M:%S")
  408. _['service_name'] = service_name
  409. _['wechat_name'] = wechat_name
  410. _['wechat_id_info'] = wechat_id_info
  411. _['status'] = status
  412. result_.append(_)
  413. self.write({'status': {'msg': 'success', "RetCode": 200},
  414. 'local_ad_plan_info': result_})
  415. class get_all_ad_task(BaseHandler):
  416. def get(self):
  417. user_id = self.get_argument('user_id', None)
  418. sql_session = db.DBSession()
  419. if user_id is None:
  420. self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
  421. return
  422. # 落地页名字精确到毫秒,默认是全局唯一
  423. result = sql_tools.get_ad_task(sql_session=sql_session, user_id=user_id)
  424. task_dict = {}
  425. localtion = ['wechat', '']
  426. for _ in result:
  427. task_name, status, task_status_num, create_time, typesetting = _
  428. print(typesetting)
  429. typesetting = json.loads(typesetting)
  430. if typesetting['plan_base'][1] == 'pyq':
  431. localtion[1] = 'pyq'
  432. create_time = create_time.strftime("%Y-%m-%d %H:%M:%S")
  433. if task_name not in task_dict.keys():
  434. task_dict[task_name] = {}
  435. task_dict[task_name][status] = (task_status_num, create_time)
  436. result_ = []
  437. num = 0
  438. for k, v in task_dict.items():
  439. # TODO:修改为dict的sort
  440. sum_num = 0
  441. print(k, v)
  442. new_dict = {}
  443. create_time = None
  444. for k_, v_ in v.items():
  445. task_status_num, create_time = v_
  446. sum_num = sum_num + task_status_num
  447. new_dict[k_] = task_status_num
  448. status = 'todo' if 'todo' in new_dict.keys() else 'done'
  449. task_dict[k]['sum_num'] = sum_num
  450. new_dict['sum_num'] = sum_num
  451. result_.append(
  452. {'task_name': k, 'task_info': new_dict, 'create_time': create_time, 'channel': localtion[0],
  453. 'localtion': localtion[1], 'id': num, 'status': status})
  454. num = num + 1
  455. print(json.dumps(task_dict))
  456. self.write({'status': {'msg': 'success', "RetCode": 200},
  457. 'local_ad_plan_info': result_})
  458. def heart_jump():
  459. # TODO:tornado 心跳检测,下周做----线程不断检查,线程生命周期60分钟
  460. pass
  461. def make_app():
  462. return tornado.web.Application([
  463. ("/get_all_ad_task", get_all_ad_task), # 获取所有任务状态,
  464. ("/create_ad_plan", create_ad_plan), #
  465. ("/get_ad_wechat_service_name", get_ad_wechat_service_name),
  466. ("/get_ad_wechat_wechat_name", get_ad_wechat_wechat_name),
  467. # ("/create_ad_plan_local", create_ad_plan_local),
  468. ("/create_ad_layout_local", create_ad_layout_local),
  469. ("/get_layout_local", get_ad_layout_local),
  470. ("/get_ad_plan_local", get_ad_plan_local),
  471. ("/delete_layout_local", delete_ad_layout),
  472. ("/delete_ad_plan_local", delete_ad_plan),
  473. # ("/create_ad_layout_remote", create_ad_layout_remote),
  474. ("/ad_human_info", ad_human_info),
  475. ("/ad_wechat_info", ad_wechat_info),
  476. ("/get_plan_action_record", get_plan_action_record),
  477. ], debug=True, autoreload=True)
  478. if __name__ == "__main__":
  479. import logging
  480. logging.basicConfig(
  481. handlers=[
  482. logging.handlers.RotatingFileHandler('./tornado.log',
  483. maxBytes=10 * 1024 * 1024,
  484. backupCount=5,
  485. encoding='utf-8')
  486. , logging.StreamHandler() # 供输出使用
  487. ],
  488. level=logging.INFO,
  489. format="%(asctime)s - %(levelname)s %(filename)s %(funcName)s %(lineno)s - %(message)s"
  490. )
  491. handler = logging.FileHandler('tornado.log')
  492. logger = logging.getLogger()
  493. logger.addHandler(handler)
  494. logger.setLevel(logging.INFO)
  495. app = make_app()
  496. app.listen(8888)
  497. tornado.ioloop.IOLoop.current().start()