tornado_api.py 23 KB


  1. from wechat_action.sql_models import DB
  2. from settings import using_config
  3. import tornado.log
  4. import tornado.ioloop
  5. import tornado.web
  6. from logging import handlers
  7. from wechat_action.login_ad import LogIn
  8. from wechat_action import sql_tools
  9. import threading
  10. from web_module import user_action
  11. from sqlalchemy import Table
  12. import json
  13. import pickle
  14. # TODO:需要添加上supervisor,来维护进程
  15. # TODO:有时间需要对tornado进行改进
  16. # TODO:需要有一套上线工具,来维持线上稳定
  17. db = DB(config=using_config)
  18. wechat_cookies_table = Table('wechat_cookies', db.metadata,
  19. autoload=True, autoload_with=db.engine)
  20. layout_typesetting_table = Table('layout_typesetting', db.metadata,
  21. autoload=True, autoload_with=db.engine)
  22. ad_plan_typesetting_table = Table('ad_plan_typesetting', db.metadata,
  23. autoload=True, autoload_with=db.engine)
  24. action_record_table = Table('action_record', db.metadata,
  25. autoload=True, autoload_with=db.engine)
  26. # 1.实现本机服务
  27. # 2.实现线上docker-selenium服务
  28. class BaseHandler(tornado.web.RequestHandler):
  29. def options(self):
  30. pass
  31. def set_default_headers(self):
  32. self.set_header('Access-Control-Allow-Origin', '*')
  33. self.set_header('Access-Control-Allow-Headers', '*')
  34. self.set_header('Access-Control-Max-Age', 1000)
  35. self.set_header('Content-type', '*')
  36. self.set_header('Access-Control-Allow-Methods', '*')
  37. class create_ad_plan_local(BaseHandler):
  38. def post(self):
  39. request_dict = json.loads(self.request.body, encoding='utf-8')
  40. user_id = request_dict['user_id']
  41. ad_plan_list = request_dict['plan_list']
  42. print(user_id, ad_plan_list)
  43. sql_session = db.DBSession()
  44. if user_id is None or ad_plan_list is None:
  45. self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
  46. return
  47. # 落地页名字精确到毫秒,默认是全局唯一
  48. for _ in ad_plan_list:
  49. ad_plan_name = _['title']
  50. ad_plan_typesetting_info = {'user_id': user_id, 'name': ad_plan_name,
  51. 'typesetting': json.dumps(_, ensure_ascii=False)}
  52. ad_plan_typesetting_inserte = sql_tools.save_ad_plan_typesetting_info(
  53. ad_plan_typesetting_info=ad_plan_typesetting_info,
  54. table_ad_plan_typesetting=ad_plan_typesetting_table)
  55. sql_session.execute(ad_plan_typesetting_inserte)
  56. sql_session.commit()
  57. self.write({'status': {'msg': 'success', "RetCode": 200}})
  58. class create_ad_plan(BaseHandler):
  59. # TODO:只要tornado开着就不允许修改数据库,------想好之后上线如何操作
  60. # TODO:需要与刷新用户cookie相关action 联动,---------正在刷新用户信息时,不能进行计划创建.反之同理
  61. # TODO:名字检查----只保留三种符号(.-_),中文字符长度一,数字字符长度二
  62. def post(self):
  63. try:
  64. sql_session = db.DBSession()
  65. request_dict = json.loads(self.request.body, encoding='utf-8')
  66. print(request_dict)
  67. user_id = request_dict['user_id']
  68. ad_plan_list = request_dict['plan_list']
  69. # 1.查看是否cookie可用
  70. log_ad, cookie_canuse = ad_human_info.refresh_wechat_cookies(self, user_id=user_id)
  71. # 2.数据存入数据库
  72. print(user_id, ad_plan_list)
  73. if user_id is None or ad_plan_list is None:
  74. self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
  75. return
  76. # 2.1存计划数据
  77. for _ in ad_plan_list:
  78. ad_plan_name = _['title']
  79. ad_plan_typesetting_info = {'user_id': user_id, 'name': ad_plan_name,
  80. 'typesetting': json.dumps(_, ensure_ascii=False)}
  81. print('typesetting_info')
  82. print(_)
  83. print(ad_plan_typesetting_info)
  84. ad_plan_typesetting_inserte = sql_tools.save_ad_plan_typesetting_info(
  85. ad_plan_typesetting_info=ad_plan_typesetting_info,
  86. table_ad_plan_typesetting=ad_plan_typesetting_table)
  87. sql_session.execute(ad_plan_typesetting_inserte)
  88. # 4.开始运行
  89. threading.Thread(target=user_action.carry_plan,
  90. args=(user_id, ad_plan_list, log_ad, db, cookie_canuse)).start()
  91. except Exception as e:
  92. self.write('eror')
  93. try:
  94. sql_session.commit()
  95. except:
  96. pass
  97. class get_ad_plan_local(BaseHandler):
  98. def get(self):
  99. user_id = self.get_argument('user_id', None)
  100. layout_name = self.get_argument('plan_name', None)
  101. sql_session = db.DBSession()
  102. if user_id is None:
  103. self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
  104. return
  105. # 落地页名字精确到毫秒,默认是全局唯一
  106. if layout_name:
  107. result = sql_tools.get_plan_typesetting_rough(sql_session=sql_session, user_id=user_id,
  108. typesetting_name=layout_name)
  109. else:
  110. # TODO:之后修改一下,让其查询效率高点,like效率过低
  111. layout_name = ''
  112. result = sql_tools.get_plan_typesetting_rough(sql_session=sql_session, user_id=user_id,
  113. typesetting_name=layout_name)
  114. print(result)
  115. result_ = []
  116. for i in range(len(result)):
  117. print(result[i])
  118. typesetting, name, create_time, update_time = result[i]
  119. _ = {}
  120. _['typesetting'] = json.loads(typesetting)
  121. _['ad_plan_name'] = name
  122. _['id'] = i
  123. _['create_time'] = create_time.strftime("%Y-%m-%d %H:%M:%S")
  124. _['update_time'] = update_time.strftime("%Y-%m-%d %H:%M:%S")
  125. result_.append(_)
  126. self.write({'status': {'msg': 'success', "RetCode": 200},
  127. 'local_ad_plan_info': result_})
  128. class create_ad_layout_local(BaseHandler):
  129. def post(self):
  130. # TODO:返回一个layout_name重复的一个信息
  131. request_dict = json.loads(self.request.body)
  132. user_id = request_dict['user_id']
  133. layout_typesetting = request_dict['layout_typesetting']
  134. layout_name = request_dict['layout_name']
  135. print(user_id, layout_typesetting, layout_name)
  136. print('layout-typesetting', type(layout_typesetting), layout_typesetting)
  137. sql_session = db.DBSession()
  138. if user_id is None or layout_name is None or layout_typesetting is None:
  139. self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
  140. return
  141. # 落地页名字精确到毫秒,默认是全局唯一
  142. layout_typesetting_info = {'user_id': user_id, 'name': layout_name,
  143. 'typesetting': layout_typesetting}
  144. layout_typesetting_inserte = sql_tools.save_layout_typesetting_info(
  145. layout_typesetting_info=layout_typesetting_info,
  146. table_layout_typesetting=layout_typesetting_table)
  147. sql_session.execute(layout_typesetting_inserte)
  148. sql_session.commit()
  149. self.write({'status': {'msg': 'success', "RetCode": 200}})
  150. class get_ad_layout_local(BaseHandler):
  151. def get(self):
  152. user_id = self.get_argument('user_id', None)
  153. layout_name = self.get_argument('layout_name', None)
  154. sql_session = db.DBSession()
  155. if user_id is None:
  156. self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
  157. return
  158. # 落地页名字精确到毫秒,默认是全局唯一
  159. if layout_name:
  160. result = sql_tools.get_layout_typesetting_rough(sql_session=sql_session, user_id=user_id,
  161. typesetting_name=layout_name)
  162. else:
  163. # TODO:之后修改一下,让其查询效率高点,like效率过低
  164. layout_name = ''
  165. result = sql_tools.get_layout_typesetting_rough(sql_session=sql_session, user_id=user_id,
  166. typesetting_name=layout_name)
  167. print(result)
  168. result_ = []
  169. for i in range(len(result)):
  170. print(result[i])
  171. typesetting, name, create_time, update_time = result[i]
  172. _ = {}
  173. _['typesetting'] = json.loads(typesetting)
  174. _['layout_name'] = name
  175. _['id'] = i
  176. _['create_time'] = create_time.strftime("%Y-%m-%d %H:%M:%S")
  177. _['update_time'] = update_time.strftime("%Y-%m-%d %H:%M:%S")
  178. result_.append(_)
  179. self.write({'status': {'msg': 'success', "RetCode": 200},
  180. 'local_layout_info': result_})
  181. # TODO:wechat_info,human_info 这两张表有空时需要进行对应改进
  182. # TODO:ad_human_info ,ad_wecaht_info 两个的行为需要与create_ad_plan 进行交互
  183. class ad_human_info(BaseHandler):
  184. # TODO:设置一下update---table,如果失败了sql_session需要关闭
  185. @staticmethod
  186. def refresh_wechat_cookies(tornado_web, user_id):
  187. # TODO:添加互动接口,添加状态字段,打开selenium就变换
  188. # 1.返回二维码链接
  189. # ----1.查看cookie是否可用
  190. sql_session = db.DBSession()
  191. cookie_db = sql_tools.get_wechat_cookies(sql_session, user_id=user_id)
  192. # 进行登录操作
  193. log_ad = LogIn(user_id=user_id)
  194. # 使driver可以使用
  195. cookie_canuse = False
  196. if cookie_db:
  197. cookie_db = pickle.loads(cookie_db)
  198. if not log_ad.wechat_cookies_check_alive(cookie_db):
  199. # cookie 不能使用
  200. wechat_code = log_ad.log_in()
  201. tornado_web.write({'status': {'msg': 'success', "RetCode": 200},
  202. 'wechat_code': wechat_code})
  203. print('cookie can not use')
  204. else:
  205. # cookie 可以继续使用
  206. cookie_canuse = True
  207. log_ad.driver.get('https://a.weixin.qq.com/index.html')
  208. tornado_web.write({'status': {'msg': 'success', "RetCode": 200}})
  209. else:
  210. # cookie 不能使用
  211. wechat_code = log_ad.log_in()
  212. tornado_web.write({'status': {'msg': 'success', "RetCode": 200},
  213. 'wechat_code': wechat_code})
  214. return log_ad, cookie_canuse
  215. # 1.人群包获取
  216. def get(self):
  217. sql_session = db.DBSession()
  218. try:
  219. # TODO:添加分页
  220. # 0.是否刷新
  221. # 1.获取userid,以及是否刷新
  222. user_id = self.get_argument("user_id", None)
  223. human_package_name = self.get_argument('human_package_name', None)
  224. is_refresh = self.get_argument("is_refresh", None)
  225. wechat_name = self.get_argument('wechat_name', None)
  226. service_name = self.get_argument('service_name', None)
  227. print(user_id, is_refresh)
  228. if user_id is None or is_refresh is None or wechat_name is None or service_name is None:
  229. self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
  230. return
  231. # TODO:一个涉及到selenium-driver的请求-生命周期.----看一下tornado是怎么处理请求的生命周期
  232. if int(is_refresh) == 1:
  233. log_ad, cookie_canuse = self.refresh_wechat_cookies(self, user_id=user_id)
  234. threading.Thread(target=user_action.get_human_info,
  235. args=(
  236. user_id, log_ad, db, cookie_canuse)).start()
  237. else:
  238. # 1.查看是否在刷新,
  239. # 在刷新中,
  240. # 返回正在刷新
  241. # -------不管上面逻辑让他们多刷新几次
  242. # 不在刷新
  243. # 返回对应数据
  244. # 2.获取userid对应数据
  245. result = sql_tools.get_human_info(sql_session=sql_session,
  246. service_name=service_name, wechat_name=wechat_name)
  247. print(result)
  248. result = json.loads(result)
  249. if human_package_name:
  250. result = [_ for _ in result if human_package_name in _['name']]
  251. result_ = []
  252. for i in range(len(result)):
  253. _ = result[i]
  254. _['id'] = i
  255. result_.append(_)
  256. self.write({'status': {'msg': 'success', "RetCode": 200},
  257. 'human_info': result})
  258. except Exception as e:
  259. logging.error(str(e))
  260. try:
  261. sql_session.commit()
  262. except:
  263. pass
  264. class ad_wechat_info(BaseHandler):
  265. # 1.公众号相关信息获取
  266. def get(self):
  267. try:
  268. # TODO:添加分页,
  269. # 公众号,服务商,唯一id设计或者获取
  270. # 0.是否刷新
  271. # 1.获取userid,以及是否刷新
  272. user_id = self.get_argument("user_id", None)
  273. is_refresh = self.get_argument("is_refresh", None)
  274. print(user_id, is_refresh)
  275. if user_id is None or is_refresh is None:
  276. self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
  277. return
  278. sql_session = db.DBSession()
  279. # TODO:一个涉及到selenium-driver的请求-生命周期.----看一下tornado是怎么处理请求的生命周期
  280. if int(is_refresh) == 1:
  281. log_ad, cookie_canuse = ad_human_info.refresh_wechat_cookies(self, user_id=user_id)
  282. threading.Thread(target=user_action.get_human_info,
  283. args=(
  284. user_id, log_ad, db, cookie_canuse)).start()
  285. else:
  286. # 1.查看是否在刷新,
  287. # 在刷新中,
  288. # 返回正在刷新
  289. # -------不管上面逻辑让他们多刷新几次
  290. # 不在刷新
  291. # 返回对应数据
  292. # 2.获取userid对应数据
  293. result = sql_tools.get_wechat_info(sql_session=sql_session, user_id=user_id)
  294. result_list = []
  295. for _ in result:
  296. service_name, wechat_name = _
  297. result_list.append({'service_name': service_name, 'wechat_name': wechat_name})
  298. print(result_list)
  299. self.write({'status': {'msg': 'success', "RetCode": 200},
  300. 'wechat_info': result_list})
  301. except Exception as e:
  302. try:
  303. sql_session.commit()
  304. except:
  305. pass
  306. class delete_ad_layout(BaseHandler):
  307. def get(self):
  308. user_id = self.get_argument('user_id', None)
  309. layout_name = self.get_argument('layout_name', None)
  310. sql_session = db.DBSession()
  311. if user_id is None or layout_name is None:
  312. self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
  313. return
  314. # 落地页名字精确到毫秒,默认是全局唯一
  315. sql_tools.delete_layout_typesetting_vir(sql_session=sql_session, user_id=user_id,
  316. typesetting_name=layout_name)
  317. self.write({'status': {'msg': 'success', "RetCode": 200}})
  318. class delete_ad_plan(BaseHandler):
  319. def get(self):
  320. user_id = self.get_argument('user_id', None)
  321. plan_name = self.get_argument('plan_name', None)
  322. service_name = self.get_argument('service_name', None)
  323. wechat_name = self.get_argument('wechat_name', None)
  324. sql_session = db.DBSession()
  325. if user_id is None or plan_name is None:
  326. self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
  327. return
  328. # 落地页名字精确到毫秒,默认是全局唯一
  329. sql_tools.delete_ad_plan_typesetting_vir(sql_session=sql_session, user_id=user_id,
  330. typesetting_name=plan_name, wechat_name=wechat_name,
  331. service_name=service_name)
  332. self.write({'status': {'msg': 'success', "RetCode": 200}})
  333. class get_ad_wechat_service_name(BaseHandler):
  334. def get(self):
  335. user_id = self.get_argument('user_id', None)
  336. sql_session = db.DBSession()
  337. if user_id is None:
  338. self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
  339. return
  340. result = sql_tools.get_wechat_info_service_name(sql_session=sql_session, user_id=user_id)
  341. result_list = []
  342. for _ in result:
  343. service_name = _
  344. result_list.append({'service_name': service_name})
  345. print(result_list)
  346. self.write({'status': {'msg': 'success', "RetCode": 200},
  347. 'wechat_info': result_list})
  348. class get_ad_wechat_wechat_name(BaseHandler):
  349. def get(self):
  350. user_id = self.get_argument('user_id', None)
  351. service_name = self.get_argument('service_name', None)
  352. sql_session = db.DBSession()
  353. if user_id is None or service_name is None:
  354. self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
  355. return
  356. result = sql_tools.get_wechat_info_wechat_name(sql_session=sql_session, user_id=user_id,
  357. service_name=service_name)
  358. result_list = []
  359. for _ in result:
  360. service_name, wechat_name = _
  361. result_list.append({'service_name': service_name, 'wechat_name': wechat_name})
  362. print(result_list)
  363. self.write({'status': {'msg': 'success', "RetCode": 200},
  364. 'wechat_info': result_list})
  365. class get_plan_action_record(BaseHandler):
  366. def get(self):
  367. user_id = self.get_argument('user_id', None)
  368. service_name = self.get_argument('service_name', None)
  369. wechat_name = self.get_argument('wechat_name', None)
  370. status = self.get_argument('status', None)
  371. plan_name = self.get_argument('plan_name', None)
  372. sql_session = db.DBSession()
  373. if user_id is None:
  374. self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
  375. return
  376. # 落地页名字精确到毫秒,默认是全局唯一
  377. result = sql_tools.get_plan_record(sql_session=sql_session, user_id=user_id,
  378. service_name=service_name, wechat_name=wechat_name,
  379. status=status, plan_name=plan_name)
  380. result_ = []
  381. for i in range(len(result)):
  382. print(result[i])
  383. user_id, name, service_name, wechat_name, create_time, status, typesetting, wechat_id_info = result[i]
  384. _ = {}
  385. _['typesetting'] = json.loads(typesetting)
  386. _['ad_plan_name'] = name
  387. _['id'] = i
  388. _['create_time'] = create_time.strftime("%Y-%m-%d %H:%M:%S")
  389. _['service_name'] = service_name
  390. _['wechat_name'] = wechat_name
  391. _['wechat_id_info'] = wechat_id_info
  392. _['status'] = status
  393. result_.append(_)
  394. self.write({'status': {'msg': 'success', "RetCode": 200},
  395. 'local_ad_plan_info': result_})
  396. class get_all_ad_task(BaseHandler):
  397. def get(self):
  398. user_id = self.get_argument('user_id', None)
  399. sql_session = db.DBSession()
  400. if user_id is None:
  401. self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
  402. return
  403. # 落地页名字精确到毫秒,默认是全局唯一
  404. result = sql_tools.get_ad_task(sql_session=sql_session, user_id=user_id)
  405. task_dict = {}
  406. localtion = ['wechat', '']
  407. for _ in result:
  408. task_name, status, task_status_num, create_time, typesetting = _
  409. print(typesetting)
  410. typesetting = json.loads(typesetting)
  411. if typesetting['plan_base'][1] == 'pyq':
  412. localtion[1] = 'pyq'
  413. create_time = create_time.strftime("%Y-%m-%d %H:%M:%S")
  414. if task_name not in task_dict.keys():
  415. task_dict[task_name] = {}
  416. task_dict[task_name][status] = (task_status_num, create_time)
  417. result_ = []
  418. num = 0
  419. for k, v in task_dict.items():
  420. # TODO:修改为dict的sort
  421. sum_num = 0
  422. print(k, v)
  423. new_dict = {}
  424. create_time = None
  425. for k_, v_ in v.items():
  426. task_status_num, create_time = v_
  427. sum_num = sum_num + task_status_num
  428. new_dict[k_] = task_status_num
  429. status = 'todo' if 'todo' in new_dict.keys() else 'done'
  430. task_dict[k]['sum_num'] = sum_num
  431. new_dict['sum_num'] = sum_num
  432. result_.append(
  433. {'task_name': k, 'task_info': new_dict, 'create_time': create_time, 'channel': localtion[0],
  434. 'localtion': localtion[1], 'id': num, 'status': status})
  435. num = num + 1
  436. print(json.dumps(task_dict))
  437. self.write({'status': {'msg': 'success', "RetCode": 200},
  438. 'local_ad_plan_info': result_})
  439. def heart_jump():
  440. # TODO:tornado 心跳检测,下周做----线程不断检查,线程生命周期60分钟
  441. pass
  442. def make_app():
  443. return tornado.web.Application([
  444. ("/get_all_ad_task", get_all_ad_task), # 获取所有任务状态,
  445. ("/create_ad_plan", create_ad_plan), #
  446. ("/get_ad_wechat_service_name", get_ad_wechat_service_name),
  447. ("/get_ad_wechat_wechat_name", get_ad_wechat_wechat_name),
  448. # ("/create_ad_plan_local", create_ad_plan_local),
  449. ("/create_ad_layout_local", create_ad_layout_local),
  450. ("/get_layout_local", get_ad_layout_local),
  451. ("/get_ad_plan_local", get_ad_plan_local),
  452. ("/delete_layout_local", delete_ad_layout),
  453. ("/delete_ad_plan_local", delete_ad_plan),
  454. # ("/create_ad_layout_remote", create_ad_layout_remote),
  455. ("/ad_human_info", ad_human_info),
  456. ("/ad_wechat_info", ad_wechat_info),
  457. ("/get_plan_action_record", get_plan_action_record),
  458. ], debug=True, autoreload=True)
  459. if __name__ == "__main__":
  460. import logging
  461. logging.basicConfig(
  462. handlers=[
  463. logging.handlers.RotatingFileHandler('./tornado.log',
  464. maxBytes=10 * 1024 * 1024,
  465. backupCount=5,
  466. encoding='utf-8')
  467. , logging.StreamHandler() # 供输出使用
  468. ],
  469. level=logging.INFO,
  470. format="%(asctime)s - %(levelname)s %(filename)s %(funcName)s %(lineno)s - %(message)s"
  471. )
  472. handler = logging.FileHandler('tornado.log')
  473. logger = logging.getLogger()
  474. logger.addHandler(handler)
  475. logger.setLevel(logging.INFO)
  476. app = make_app()
  477. app.listen(8888)
  478. tornado.ioloop.IOLoop.current().start()