tornado_api.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541
  1. from wechat_action.sql_models import DB
  2. from settings import using_config
  3. import tornado.log
  4. import tornado.ioloop
  5. import tornado.web
  6. from logging import handlers
  7. from wechat_action.login_ad import LogIn
  8. from wechat_action import sql_tools
  9. import threading
  10. from web_module import user_action
  11. from sqlalchemy import Table
  12. import json
  13. import pickle
  14. # TODO:需要添加上supervisor,来维护进程
  15. # TODO:有时间需要对tornado进行改进
  16. # TODO:需要有一套上线工具,来维持线上稳定
  17. db = DB(config=using_config)
  18. wechat_cookies_table = Table('wechat_cookies', db.metadata,
  19. autoload=True, autoload_with=db.engine)
  20. layout_typesetting_table = Table('layout_typesetting', db.metadata,
  21. autoload=True, autoload_with=db.engine)
  22. ad_plan_typesetting_table = Table('ad_plan_typesetting', db.metadata,
  23. autoload=True, autoload_with=db.engine)
  24. action_record_table = Table('action_record', db.metadata,
  25. autoload=True, autoload_with=db.engine)
  26. # 1.实现本机服务
  27. # 2.实现线上docker-selenium服务
  28. class BaseHandler(tornado.web.RequestHandler):
  29. def options(self):
  30. pass
  31. def set_default_headers(self):
  32. self.set_header('Access-Control-Allow-Origin', '*')
  33. self.set_header('Access-Control-Allow-Headers', '*')
  34. self.set_header('Access-Control-Max-Age', 1000)
  35. self.set_header('Content-type', '*')
  36. self.set_header('Access-Control-Allow-Methods', '*')
  37. class create_ad_plan_local(BaseHandler):
  38. def post(self):
  39. request_dict = json.loads(self.request.body, encoding='utf-8')
  40. user_id = request_dict['user_id']
  41. ad_plan_list = request_dict['plan_list']
  42. print(user_id, ad_plan_list)
  43. sql_session = db.DBSession()
  44. if user_id is None or ad_plan_list is None:
  45. self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
  46. return
  47. # 落地页名字精确到毫秒,默认是全局唯一
  48. for _ in ad_plan_list:
  49. ad_plan_name = _['title']
  50. ad_plan_typesetting_info = {'user_id': user_id, 'name': ad_plan_name,
  51. 'typesetting': json.dumps(_, ensure_ascii=False)}
  52. ad_plan_typesetting_inserte = sql_tools.save_ad_plan_typesetting_info(
  53. ad_plan_typesetting_info=ad_plan_typesetting_info,
  54. table_ad_plan_typesetting=ad_plan_typesetting_table)
  55. sql_session.execute(ad_plan_typesetting_inserte)
  56. sql_session.commit()
  57. self.write({'status': {'msg': 'success', "RetCode": 200}})
  58. class create_ad_plan(BaseHandler):
  59. # TODO:只要tornado开着就不允许修改数据库,------想好之后上线如何操作
  60. # TODO:需要与刷新用户cookie相关action 联动,---------正在刷新用户信息时,不能进行计划创建.反之同理
  61. def post(self):
  62. try:
  63. sql_session = db.DBSession()
  64. request_dict = json.loads(self.request.body, encoding='utf-8')
  65. print(request_dict)
  66. user_id = request_dict['user_id']
  67. ad_plan_list = request_dict['plan_list']
  68. # 1.查看是否cookie可用
  69. log_ad, cookie_canuse = ad_human_info.refresh_wechat_cookies(self, user_id=user_id)
  70. # 2.数据存入数据库
  71. print(user_id, ad_plan_list)
  72. if user_id is None or ad_plan_list is None:
  73. self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
  74. return
  75. # 2.1存计划数据
  76. for _ in ad_plan_list:
  77. ad_plan_name = _['title'].replace(' ', '').replace('.','').replace(',','')[:29]
  78. ad_plan_typesetting_info = {'user_id': user_id, 'name': ad_plan_name,
  79. 'typesetting': json.dumps(_, ensure_ascii=False)}
  80. print('typesetting_info')
  81. print(_)
  82. print(ad_plan_typesetting_info)
  83. ad_plan_typesetting_inserte = sql_tools.save_ad_plan_typesetting_info(
  84. ad_plan_typesetting_info=ad_plan_typesetting_info,
  85. table_ad_plan_typesetting=ad_plan_typesetting_table)
  86. sql_session.execute(ad_plan_typesetting_inserte)
  87. # 4.开始运行
  88. threading.Thread(target=user_action.carry_plan,
  89. args=(user_id, ad_plan_list, log_ad, db, cookie_canuse)).start()
  90. except Exception as e:
  91. self.write('eror')
  92. try:
  93. sql_session.commit()
  94. except:
  95. pass
  96. class get_ad_plan_local(BaseHandler):
  97. def get(self):
  98. user_id = self.get_argument('user_id', None)
  99. layout_name = self.get_argument('plan_name', None)
  100. sql_session = db.DBSession()
  101. if user_id is None:
  102. self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
  103. return
  104. # 落地页名字精确到毫秒,默认是全局唯一
  105. if layout_name:
  106. result = sql_tools.get_plan_typesetting_rough(sql_session=sql_session, user_id=user_id,
  107. typesetting_name=layout_name)
  108. else:
  109. # TODO:之后修改一下,让其查询效率高点,like效率过低
  110. layout_name = ''
  111. result = sql_tools.get_plan_typesetting_rough(sql_session=sql_session, user_id=user_id,
  112. typesetting_name=layout_name)
  113. print(result)
  114. result_ = []
  115. for i in range(len(result)):
  116. print(result[i])
  117. typesetting, name, create_time, update_time = result[i]
  118. _ = {}
  119. _['typesetting'] = json.loads(typesetting)
  120. _['ad_plan_name'] = name
  121. _['id'] = i
  122. _['create_time'] = create_time.strftime("%Y-%m-%d %H:%M:%S")
  123. _['update_time'] = update_time.strftime("%Y-%m-%d %H:%M:%S")
  124. result_.append(_)
  125. self.write({'status': {'msg': 'success', "RetCode": 200},
  126. 'local_ad_plan_info': result_})
  127. class create_ad_layout_local(BaseHandler):
  128. def post(self):
  129. # TODO:返回一个layout_name重复的一个信息
  130. request_dict = json.loads(self.request.body)
  131. user_id = request_dict['user_id']
  132. layout_typesetting = request_dict['layout_typesetting']
  133. layout_name = request_dict['layout_name'].replace(' ', '')[:29]
  134. print(user_id, layout_typesetting, layout_name)
  135. print('layout-typesetting', type(layout_typesetting), layout_typesetting)
  136. sql_session = db.DBSession()
  137. if user_id is None or layout_name is None or layout_typesetting is None:
  138. self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
  139. return
  140. # 落地页名字精确到毫秒,默认是全局唯一
  141. layout_typesetting_info = {'user_id': user_id, 'name': layout_name,
  142. 'typesetting': layout_typesetting}
  143. layout_typesetting_inserte = sql_tools.save_layout_typesetting_info(
  144. layout_typesetting_info=layout_typesetting_info,
  145. table_layout_typesetting=layout_typesetting_table)
  146. sql_session.execute(layout_typesetting_inserte)
  147. sql_session.commit()
  148. self.write({'status': {'msg': 'success', "RetCode": 200}})
  149. class get_ad_layout_local(BaseHandler):
  150. def get(self):
  151. user_id = self.get_argument('user_id', None)
  152. layout_name = self.get_argument('layout_name', None)
  153. sql_session = db.DBSession()
  154. if user_id is None:
  155. self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
  156. return
  157. # 落地页名字精确到毫秒,默认是全局唯一
  158. if layout_name:
  159. result = sql_tools.get_layout_typesetting_rough(sql_session=sql_session, user_id=user_id,
  160. typesetting_name=layout_name)
  161. else:
  162. # TODO:之后修改一下,让其查询效率高点,like效率过低
  163. layout_name = ''
  164. result = sql_tools.get_layout_typesetting_rough(sql_session=sql_session, user_id=user_id,
  165. typesetting_name=layout_name)
  166. print(result)
  167. result_ = []
  168. for i in range(len(result)):
  169. print(result[i])
  170. typesetting, name, create_time, update_time = result[i]
  171. _ = {}
  172. _['typesetting'] = json.loads(typesetting)
  173. _['layout_name'] = name
  174. _['id'] = i
  175. _['create_time'] = create_time.strftime("%Y-%m-%d %H:%M:%S")
  176. _['update_time'] = update_time.strftime("%Y-%m-%d %H:%M:%S")
  177. result_.append(_)
  178. self.write({'status': {'msg': 'success', "RetCode": 200},
  179. 'local_layout_info': result_})
  180. # TODO:wechat_info,human_info 这两张表有空时需要进行对应改进
  181. # TODO:ad_human_info ,ad_wecaht_info 两个的行为需要与create_ad_plan 进行交互
  182. class ad_human_info(BaseHandler):
  183. # TODO:设置一下update---table,如果失败了sql_session需要关闭
  184. @staticmethod
  185. def refresh_wechat_cookies(tornado_web, user_id):
  186. # TODO:添加互动接口,添加状态字段,打开selenium就变换
  187. # 1.返回二维码链接
  188. # ----1.查看cookie是否可用
  189. sql_session = db.DBSession()
  190. cookie_db = sql_tools.get_wechat_cookies(sql_session, user_id=user_id)
  191. # 进行登录操作
  192. log_ad = LogIn(user_id=user_id)
  193. # 使driver可以使用
  194. cookie_canuse = False
  195. if cookie_db:
  196. cookie_db = pickle.loads(cookie_db)
  197. if not log_ad.wechat_cookies_check_alive(cookie_db):
  198. # cookie 不能使用
  199. wechat_code = log_ad.log_in()
  200. tornado_web.write({'status': {'msg': 'success', "RetCode": 200},
  201. 'wechat_code': wechat_code})
  202. print('cookie can not use')
  203. else:
  204. # cookie 可以继续使用
  205. cookie_canuse = True
  206. log_ad.driver.get('https://a.weixin.qq.com/index.html')
  207. tornado_web.write({'status': {'msg': 'success', "RetCode": 200}})
  208. else:
  209. # cookie 不能使用
  210. wechat_code = log_ad.log_in()
  211. tornado_web.write({'status': {'msg': 'success', "RetCode": 200},
  212. 'wechat_code': wechat_code})
  213. return log_ad, cookie_canuse
  214. # 1.人群包获取
  215. def get(self):
  216. sql_session = db.DBSession()
  217. try:
  218. # TODO:添加分页
  219. # 0.是否刷新
  220. # 1.获取userid,以及是否刷新
  221. user_id = self.get_argument("user_id", None)
  222. human_package_name = self.get_argument('human_package_name', None)
  223. is_refresh = self.get_argument("is_refresh", None)
  224. wechat_name = self.get_argument('wechat_name', None)
  225. service_name = self.get_argument('service_name', None)
  226. print(user_id, is_refresh)
  227. if user_id is None or is_refresh is None or wechat_name is None or service_name is None:
  228. self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
  229. return
  230. # TODO:一个涉及到selenium-driver的请求-生命周期.----看一下tornado是怎么处理请求的生命周期
  231. if int(is_refresh) == 1:
  232. log_ad, cookie_canuse = self.refresh_wechat_cookies(self, user_id=user_id)
  233. threading.Thread(target=user_action.get_human_info,
  234. args=(
  235. user_id, log_ad, db, cookie_canuse)).start()
  236. else:
  237. # 1.查看是否在刷新,
  238. # 在刷新中,
  239. # 返回正在刷新
  240. # -------不管上面逻辑让他们多刷新几次
  241. # 不在刷新
  242. # 返回对应数据
  243. # 2.获取userid对应数据
  244. result = sql_tools.get_human_info(sql_session=sql_session,
  245. service_name=service_name, wechat_name=wechat_name)
  246. print(result)
  247. result = json.loads(result)
  248. if human_package_name:
  249. result = [_ for _ in result if human_package_name in _['name']]
  250. result_ = []
  251. for i in range(len(result)):
  252. _ = result[i]
  253. _['id'] = i
  254. result_.append(_)
  255. self.write({'status': {'msg': 'success', "RetCode": 200},
  256. 'human_info': result})
  257. except Exception as e:
  258. logging.error(str(e))
  259. try:
  260. sql_session.commit()
  261. except:
  262. pass
  263. class ad_wechat_info(BaseHandler):
  264. # 1.公众号相关信息获取
  265. def get(self):
  266. try:
  267. # TODO:添加分页,
  268. # 公众号,服务商,唯一id设计或者获取
  269. # 0.是否刷新
  270. # 1.获取userid,以及是否刷新
  271. user_id = self.get_argument("user_id", None)
  272. is_refresh = self.get_argument("is_refresh", None)
  273. print(user_id, is_refresh)
  274. if user_id is None or is_refresh is None:
  275. self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
  276. return
  277. sql_session = db.DBSession()
  278. # TODO:一个涉及到selenium-driver的请求-生命周期.----看一下tornado是怎么处理请求的生命周期
  279. if int(is_refresh) == 1:
  280. log_ad, cookie_canuse = ad_human_info.refresh_wechat_cookies(self, user_id=user_id)
  281. threading.Thread(target=user_action.get_human_info,
  282. args=(
  283. user_id, log_ad, db, cookie_canuse)).start()
  284. else:
  285. # 1.查看是否在刷新,
  286. # 在刷新中,
  287. # 返回正在刷新
  288. # -------不管上面逻辑让他们多刷新几次
  289. # 不在刷新
  290. # 返回对应数据
  291. # 2.获取userid对应数据
  292. result = sql_tools.get_wechat_info(sql_session=sql_session, user_id=user_id)
  293. result_list = []
  294. for _ in result:
  295. service_name, wechat_name = _
  296. result_list.append({'service_name': service_name, 'wechat_name': wechat_name})
  297. print(result_list)
  298. self.write({'status': {'msg': 'success', "RetCode": 200},
  299. 'wechat_info': result_list})
  300. except Exception as e:
  301. try:
  302. sql_session.commit()
  303. except:
  304. pass
  305. class delete_ad_layout(BaseHandler):
  306. def get(self):
  307. user_id = self.get_argument('user_id', None)
  308. layout_name = self.get_argument('layout_name', None)
  309. sql_session = db.DBSession()
  310. if user_id is None or layout_name is None:
  311. self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
  312. return
  313. # 落地页名字精确到毫秒,默认是全局唯一
  314. sql_tools.delete_layout_typesetting_vir(sql_session=sql_session, user_id=user_id,
  315. typesetting_name=layout_name)
  316. self.write({'status': {'msg': 'success', "RetCode": 200}})
  317. class delete_ad_plan(BaseHandler):
  318. def get(self):
  319. user_id = self.get_argument('user_id', None)
  320. plan_name = self.get_argument('plan_name', None)
  321. service_name = self.get_argument('service_name', None)
  322. wechat_name = self.get_argument('wechat_name', None)
  323. sql_session = db.DBSession()
  324. if user_id is None or plan_name is None:
  325. self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
  326. return
  327. # 落地页名字精确到毫秒,默认是全局唯一
  328. sql_tools.delete_ad_plan_typesetting_vir(sql_session=sql_session, user_id=user_id,
  329. typesetting_name=plan_name, wechat_name=wechat_name,
  330. service_name=service_name)
  331. self.write({'status': {'msg': 'success', "RetCode": 200}})
  332. class get_ad_wechat_service_name(BaseHandler):
  333. def get(self):
  334. user_id = self.get_argument('user_id', None)
  335. sql_session = db.DBSession()
  336. if user_id is None:
  337. self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
  338. return
  339. result = sql_tools.get_wechat_info_service_name(sql_session=sql_session, user_id=user_id)
  340. result_list = []
  341. for _ in result:
  342. service_name = _
  343. result_list.append({'service_name': service_name})
  344. print(result_list)
  345. self.write({'status': {'msg': 'success', "RetCode": 200},
  346. 'wechat_info': result_list})
  347. class get_ad_wechat_wechat_name(BaseHandler):
  348. def get(self):
  349. user_id = self.get_argument('user_id', None)
  350. service_name = self.get_argument('service_name', None)
  351. sql_session = db.DBSession()
  352. if user_id is None or service_name is None:
  353. self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
  354. return
  355. result = sql_tools.get_wechat_info_wechat_name(sql_session=sql_session, user_id=user_id,
  356. service_name=service_name)
  357. result_list = []
  358. for _ in result:
  359. service_name, wechat_name = _
  360. result_list.append({'service_name': service_name, 'wechat_name': wechat_name})
  361. print(result_list)
  362. self.write({'status': {'msg': 'success', "RetCode": 200},
  363. 'wechat_info': result_list})
  364. class get_plan_action_record(BaseHandler):
  365. def get(self):
  366. user_id = self.get_argument('user_id', None)
  367. service_name = self.get_argument('service_name', None)
  368. wechat_name = self.get_argument('wechat_name', None)
  369. status = self.get_argument('status', None)
  370. plan_name = self.get_argument('plan_name', None)
  371. sql_session = db.DBSession()
  372. if user_id is None:
  373. self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
  374. return
  375. # 落地页名字精确到毫秒,默认是全局唯一
  376. result = sql_tools.get_plan_record(sql_session=sql_session, user_id=user_id,
  377. service_name=service_name, wechat_name=wechat_name,
  378. status=status, plan_name=plan_name)
  379. result_ = []
  380. for i in range(len(result)):
  381. print(result[i])
  382. user_id, name, service_name, wechat_name, create_time, status, typesetting, wechat_id_info = result[i]
  383. _ = {}
  384. _['typesetting'] = json.loads(typesetting)
  385. _['ad_plan_name'] = name
  386. _['id'] = i
  387. _['create_time'] = create_time.strftime("%Y-%m-%d %H:%M:%S")
  388. _['service_name'] = service_name
  389. _['wechat_name'] = wechat_name
  390. _['wechat_id_info'] = wechat_id_info
  391. _['status'] = status
  392. result_.append(_)
  393. self.write({'status': {'msg': 'success', "RetCode": 200},
  394. 'local_ad_plan_info': result_})
  395. class get_all_ad_task(BaseHandler):
  396. def get(self):
  397. user_id = self.get_argument('user_id', None)
  398. sql_session = db.DBSession()
  399. if user_id is None:
  400. self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
  401. return
  402. # 落地页名字精确到毫秒,默认是全局唯一
  403. result = sql_tools.get_ad_task(sql_session=sql_session, user_id=user_id)
  404. task_dict = {}
  405. localtion = ['wechat', '']
  406. for _ in result:
  407. task_name, status, task_status_num, create_time, typesetting = _
  408. print(typesetting)
  409. typesetting = json.loads(typesetting)
  410. if typesetting['plan_base'][1] == 'pyq':
  411. localtion[1] = 'pyq'
  412. create_time = create_time.strftime("%Y-%m-%d %H:%M:%S")
  413. if task_name not in task_dict.keys():
  414. task_dict[task_name] = {}
  415. task_dict[task_name][status] = (task_status_num, create_time)
  416. result_ = []
  417. num = 0
  418. for k, v in task_dict.items():
  419. # TODO:修改为dict的sort
  420. sum_num = 0
  421. print(k, v)
  422. new_dict = {}
  423. create_time = None
  424. for k_, v_ in v.items():
  425. task_status_num, create_time = v_
  426. sum_num = sum_num + task_status_num
  427. new_dict[k_] = task_status_num
  428. status = 'todo' if 'todo' in new_dict.keys() else 'done'
  429. task_dict[k]['sum_num'] = sum_num
  430. new_dict['sum_num'] = sum_num
  431. result_.append(
  432. {'task_name': k, 'task_info': new_dict, 'create_time': create_time, 'channel': localtion[0],
  433. 'localtion': localtion[1], 'id': num, 'status': status})
  434. num = num + 1
  435. print(json.dumps(task_dict))
  436. self.write({'status': {'msg': 'success', "RetCode": 200},
  437. 'local_ad_plan_info': result_})
  438. def heart_jump():
  439. # TODO:tornado 心跳检测,下周做----线程不断检查,线程生命周期60分钟
  440. pass
  441. def make_app():
  442. return tornado.web.Application([
  443. ("/get_all_ad_task", get_all_ad_task), # 获取所有任务状态,
  444. ("/create_ad_plan", create_ad_plan), #
  445. ("/get_ad_wechat_service_name", get_ad_wechat_service_name),
  446. ("/get_ad_wechat_wechat_name", get_ad_wechat_wechat_name),
  447. # ("/create_ad_plan_local", create_ad_plan_local),
  448. ("/create_ad_layout_local", create_ad_layout_local),
  449. ("/get_layout_local", get_ad_layout_local),
  450. ("/get_ad_plan_local", get_ad_plan_local),
  451. ("/delete_layout_local", delete_ad_layout),
  452. ("/delete_ad_plan_local", delete_ad_plan),
  453. # ("/create_ad_layout_remote", create_ad_layout_remote),
  454. ("/ad_human_info", ad_human_info),
  455. ("/ad_wechat_info", ad_wechat_info),
  456. ("/get_plan_action_record", get_plan_action_record),
  457. ], debug=True, autoreload=True)
  458. if __name__ == "__main__":
  459. import logging
  460. logging.basicConfig(
  461. handlers=[
  462. logging.handlers.RotatingFileHandler('./tornado.log',
  463. maxBytes=10 * 1024 * 1024,
  464. backupCount=5,
  465. encoding='utf-8')
  466. , logging.StreamHandler() # 供输出使用
  467. ],
  468. level=logging.INFO,
  469. format="%(asctime)s - %(levelname)s %(filename)s %(funcName)s %(lineno)s - %(message)s"
  470. )
  471. handler = logging.FileHandler('tornado.log')
  472. logger = logging.getLogger()
  473. logger.addHandler(handler)
  474. logger.setLevel(logging.INFO)
  475. app = make_app()
  476. app.listen(8888)
  477. tornado.ioloop.IOLoop.current().start()