Bladeren bron

MOD:状态记录

cxyu 3 jaren geleden
bovenliggende
commit
caa74da3ad
3 gewijzigde bestanden met toevoegingen van 236 en 145 verwijderingen
  1. 80 20
      web_module/tornado_api.py
  2. 111 124
      web_module/user_action.py
  3. 45 1
      wechat_action/sql_tools.py

+ 80 - 20
web_module/tornado_api.py

@@ -11,6 +11,7 @@ from web_module import user_action
 from sqlalchemy import Table
 import json
 import pickle
+from datetime import datetime
 
 # TODO:需要添加上supervisor,来维护进程
 # TODO:有时间需要对tornado进行改进
@@ -25,6 +26,9 @@ ad_plan_typesetting_table = Table('ad_plan_typesetting', db.metadata,
                                   autoload=True, autoload_with=db.engine)
 action_record_table = Table('action_record', db.metadata,
                             autoload=True, autoload_with=db.engine)
+layout_create_action = 'create_ad_layout'
+ad_plan_create_action = 'create_ad_plan'
+refresh_wechat_action = 'refresh_wechat_info'
 
 
 # 1.实现本机服务
@@ -71,6 +75,14 @@ class create_ad_plan(BaseHandler):
     # TODO:需要与刷新用户cookie相关action 联动,---------正在刷新用户信息时,不能进行计划创建.反之同理
 
     # TODO:名字检查----只保留三种符号(.-_),中文字符长度一,数字字符长度二
+
+    @staticmethod
+    def check_task(user_id):
+        # TODO:检查是否同user_id的任务在跑中,有的话只保存任务.不做其他事情
+        sql_session = db.DBSession()
+        result = sql_tools.get_task_in_hand_num(user_id, sql_session)
+        return result
+
     def post(self):
         try:
             sql_session = db.DBSession()
@@ -99,10 +111,31 @@ class create_ad_plan(BaseHandler):
                     ad_plan_typesetting_info=ad_plan_typesetting_info,
                     table_ad_plan_typesetting=ad_plan_typesetting_table)
                 sql_session.execute(ad_plan_typesetting_inserte)
+            sql_session.commit()
 
+            # 2.2存行为记录
+            task_name = 'user_id: {user_id}  time:{time_sign} action:create_plan'.format(user_id=user_id,
+                                                                                         time_sign=datetime.now().strftime(
+                                                                                             "%Y-%m-%d, %H:%M:%S"))
+            for _ in ad_plan_list:
+                print(_)
+                for action_type in [layout_create_action, ad_plan_create_action]:
+                    object_name = _['title'] if action_type == 'create_ad_plan' else _['idea']['jump_type_page_type'][
+                        'layout_name']
+                    action_info = {'user_id': user_id, 'service_name': _['service_name'],
+                                   'wechat_name': _['wechat_name'],
+                                   'action_type': action_type, 'object_name': object_name, 'task_name': task_name,
+                                   'status': 'todo'}
+                    record_insert = sql_tools.save_action_record(action_record_info=action_info,
+                                                                 table_action_record=action_record_table)
+                    sql_session.execute(record_insert)
+            sql_session.commit()
             # 4.开始运行
-            threading.Thread(target=user_action.carry_plan,
-                             args=(user_id, ad_plan_list, log_ad, db, cookie_canuse)).start()
+            if not self.check_task(user_id=user_id):
+                threading.Thread(target=user_action.carry_plan,
+                                 args=(user_id, ad_plan_list, log_ad, db, cookie_canuse)).start()
+            else:
+                self.write({'status': {'msg': '', "RetCode": 200}})
         except Exception as e:
             self.write('eror')
             try:
@@ -110,6 +143,7 @@ class create_ad_plan(BaseHandler):
             except:
                 pass
 
+
 class get_ad_plan_local(BaseHandler):
     def get(self):
         user_id = self.get_argument('user_id', None)
@@ -246,8 +280,6 @@ class ad_human_info(BaseHandler):
         sql_session = db.DBSession()
 
         try:
-            # TODO:添加分页
-
             # 0.是否刷新
             # 1.获取userid,以及是否刷新
             user_id = self.get_argument("user_id", None)
@@ -262,10 +294,27 @@ class ad_human_info(BaseHandler):
             # TODO:一个涉及到selenium-driver的请求-生命周期.----看一下tornado是怎么处理请求的生命周期
             if int(is_refresh) == 1:
                 log_ad, cookie_canuse = self.refresh_wechat_cookies(self, user_id=user_id)
-                threading.Thread(target=user_action.get_human_info,
-                                 args=(
-                                     user_id, log_ad, db, cookie_canuse)).start()
-
+                task_name = 'user_id: {user_id}  time:{time_sign} action:refresh_wechat_info'.format(user_id=user_id,
+                                                                                                     time_sign=datetime.now().strftime(
+                                                                                                         "%Y-%m-%d, %H:%M:%S"))
+                # 行为记录
+                action_type = refresh_wechat_action
+                object_name = ''
+                service_name = ''
+                wechat_name = ''
+                action_info = {'user_id': user_id, 'service_name': service_name, 'wechat_name': wechat_name,
+                               'action_type': action_type, 'object_name': object_name, 'task_name': task_name,
+                               'status': 'todo'}
+                record_insert = sql_tools.save_action_record(action_record_info=action_info,
+                                                             table_action_record=action_record_table)
+                sql_session.execute(record_insert)
+                sql_session.commit()
+                if not create_ad_plan.check_task(user_id=user_id):
+                    threading.Thread(target=user_action.get_human_info,
+                                     args=(
+                                         user_id, log_ad, db, cookie_canuse)).start()
+                else:
+                    self.write({'status': {'msg': '', "RetCode": 200}})
             else:
                 # 1.查看是否在刷新,
                 #     在刷新中,
@@ -313,21 +362,31 @@ class ad_wechat_info(BaseHandler):
                 self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
                 return
             sql_session = db.DBSession()
-            # TODO:一个涉及到selenium-driver的请求-生命周期.----看一下tornado是怎么处理请求的生命周期
             if int(is_refresh) == 1:
                 log_ad, cookie_canuse = ad_human_info.refresh_wechat_cookies(self, user_id=user_id)
-                threading.Thread(target=user_action.get_human_info,
-                                 args=(
-                                     user_id, log_ad, db, cookie_canuse)).start()
+                task_name = 'user_id: {user_id}  time:{time_sign} action:refresh_wechat_info'.format(user_id=user_id,
+                                                                                                     time_sign=datetime.now().strftime(
+                                                                                                         "%Y-%m-%d, %H:%M:%S"))
+                # 行为记录
+                action_type = refresh_wechat_action
+                object_name = ''
+                service_name = ''
+                wechat_name = ''
+                action_info = {'user_id': user_id, 'service_name': service_name, 'wechat_name': wechat_name,
+                               'action_type': action_type, 'object_name': object_name, 'task_name': task_name,
+                               'status': 'todo'}
+                record_insert = sql_tools.save_action_record(action_record_info=action_info,
+                                                             table_action_record=action_record_table)
+                sql_session.execute(record_insert)
+                sql_session.commit()
+                #检查有无其他任务在处理中,有则等待
+                if not create_ad_plan.check_task(user_id=user_id):
+                    threading.Thread(target=user_action.get_human_info,
+                                     args=(
+                                         user_id, log_ad, db, cookie_canuse)).start()
+                else:
+                    self.write({'status': {'msg': '', "RetCode": 200}})
             else:
-                # 1.查看是否在刷新,
-                #     在刷新中,
-                #       返回正在刷新
-
-                # -------不管上面逻辑让他们多刷新几次
-                #     不在刷新
-                #       返回对应数据
-                # 2.获取userid对应数据
                 result = sql_tools.get_wechat_info(sql_session=sql_session, user_id=user_id)
                 result_list = []
                 for _ in result:
@@ -342,6 +401,7 @@ class ad_wechat_info(BaseHandler):
             except:
                 pass
 
+
 class delete_ad_layout(BaseHandler):
     def get(self):
         user_id = self.get_argument('user_id', None)

+ 111 - 124
web_module/user_action.py

@@ -8,123 +8,96 @@ from communication_tools import dingtalk
 import json
 import logging
 from datetime import datetime
+from wechat_action.login_ad import LogIn
 
+# 定位为:接受请求之后对应的线程处理
 layout_create_action = 'create_ad_layout'
 ad_plan_create_action = 'create_ad_plan'
 refresh_wechat_action = 'refresh_wechat_info'
 
 
-def run(user_id, log_ad, db, cookie_canuse):
-    # sql_session = db.DBSession()
-    # # 检查是否有已经开启的任务
-    # result = sql_tools.get_action_status(sql_session, user_id)
-    # # 有:    只将任务进行添加操作todo
-    # if result:
-    #     action_info = {'user_id': user_id, 'service_name': ''}
-    #
-    #     sql_tools.save_action_record()
-    # 无:    任务添加到数据库中doing
-
-    # 检查是否需要登录
-    # 任务开始
-    # 任务中间添加的----不断循环获取同userid,----doing,error,todo状态任务
-
-    # 关闭driver
-    pass
-
-
-def cookie_acion(db, log_ad, cookie_canuse, user_id):
-    wechat_cookies_table = Table('wechat_cookies', db.metadata,
-                                 autoload=True, autoload_with=db.engine)
-    sql_session = db.DBSession()
-    # 等待页面加载完成
-    log_ad.log_in_wait()
-    # 1.保存cookie,
-    if not cookie_canuse:
-        log_ad.cookies_save(log_ad)
-        print('update wechat cookies')
-
-
-def carry_plan(user_id, ad_plan_list, log_ad, db, cookie_canuse):
-    # TODO:与get_human_info的交互需要改动
+def carry_plan(user_id, ad_plan_list, log_ad, db, cookie_canuse, task_name):
     # TODO:失败后 ,状态为:创建中  ----进行对应删除
 
     # 创建中的数据需要进行删除
-
-    # TODO:
-    #  1.任务有不断中途新增,该如何操作.------只管
-    #  2.落地页创建一半失败了怎么办
+    sql_session = db.DBSession()
 
     try:
         action_record_table = Table('action_record', db.metadata,
                                     autoload=True, autoload_with=db.engine)
-        sql_session = db.DBSession()
-        task_name = 'user_id: {user_id}  time:{time_sign} action:create_plan'.format(user_id=user_id,
-                                                                                     time_sign=datetime.now().strftime(
-                                                                                         "%Y-%m-%d, %H:%M:%S"))
-        # 2.2存行为记录
-        for _ in ad_plan_list:
-            print(_)
-            for action_type in ['create_ad_plan', 'create_ad_layout']:
-                object_name = _['title'] if action_type == 'create_ad_plan' else _['idea']['jump_type_page_type'][
-                    'layout_name']
-                object_name = object_name
-                action_info = {'user_id': user_id, 'service_name': _['service_name'], 'wechat_name': _['wechat_name'],
-                               'action_type': action_type, 'object_name': object_name, 'task_name': task_name,
-                               'status': 'todo'}
-                record_insert = sql_tools.save_action_record(action_record_info=action_info,
-                                                             table_action_record=action_record_table)
-                sql_session.execute(record_insert)
-        sql_session.commit()
 
         # cookies保存
-        cookie_acion(db, log_ad, cookie_canuse, user_id)
-
+        if not cookie_canuse:
+            log_ad.cookies_save(log_ad)
         for _ in ad_plan_list:
             service_name = _['service_name']
             wechat_name = _['wechat_name']
-            # 1.检查1.落地页是否创建过了
-            log_ad.select_ad_master(service_name, wechat_name)
-            # 现在默认layout_name在30个字符以内
-            layout_name = _['idea']['jump_type_page_type']['layout_name']
-            if CreateAd.check_sucess_api(layout_name=layout_name, log_ad=log_ad):
-                res = {'sucess': True, 'result_info': '已经创建过对应落地页'}
-                sql_tools.action_record(res, sql_session, layout_create_action, user_id, layout_name,
+            plan_name = _['title']
+            try:
+                # 1.检查1.落地页是否创建过了
+                log_ad.select_ad_master(service_name, wechat_name)
+                # 现在默认layout_name在30个字符以内
+                layout_name = _['idea']['jump_type_page_type']['layout_name']
+                if CreateAd.check_sucess_api(layout_name=layout_name, log_ad=log_ad):
+                    res = {'sucess': True, 'result_info': '已经创建过对应落地页'}
+                    sql_tools.action_record(res, sql_session, layout_create_action, user_id, layout_name,
+                                            action_record_table,
+                                            service_name, wechat_name, task_name)
+
+                else:
+                    # 1.5无则创建落地页
+                    try:
+                        create_ad_layout = CreateAd(login_ad=log_ad, service_name=service_name, wechat_name=wechat_name)
+                        layout_typesetting_dict = sql_tools.get_layout_typesetting(sql_session, user_id,
+                                                                                   typesetting_name=layout_name)
+                        print(layout_typesetting_dict)
+                        layout_typesetting_dict = json.loads(layout_typesetting_dict)
+                        res = create_ad_layout.create_layout(layout_typesetting_dict)
+                        sql_tools.action_record(res, sql_session, layout_create_action, user_id, layout_name,
+                                                action_record_table,
+                                                service_name, wechat_name, task_name)
+                    except Exception as e:
+                        res = {'sucess': False, 'result_info': str(e)}
+                        sql_tools.action_record(res, sql_session, layout_create_action, user_id, plan_name,
+                                                action_record_table,
+                                                service_name, wechat_name, task_name)
+                        continue
+                log_ad.refresh_driver()
+                # 3.创建计划
+                log_ad.select_ad_master(service_name, wechat_name)
+                # plan_typesetting_dict = sql_tools.get_ad_plan_typesetting(sql_session=sql_session, user_id=user_id,
+                #                                                           typesetting_name=plan_name)
+                create_ad_plan = CreateAdPlan(login_ad=log_ad, task=_, service_name=service_name,
+                                              wechat_name=wechat_name)
+                res = create_ad_plan.run()
+                # 4.更新action_record相关计划信息
+                sql_tools.action_record(res, sql_session, ad_plan_create_action, user_id, plan_name,
                                         action_record_table,
                                         service_name, wechat_name, task_name)
-
-            else:
-                # 1.5无则创建落地页
-                create_ad_layout = CreateAd(login_ad=log_ad, service_name=service_name, wechat_name=wechat_name)
-                layout_typesetting_dict = sql_tools.get_layout_typesetting(sql_session, user_id,
-                                                                           typesetting_name=layout_name)
-                print(layout_typesetting_dict)
-                layout_typesetting_dict = json.loads(layout_typesetting_dict)
-                res = create_ad_layout.create_layout(layout_typesetting_dict)
-                sql_tools.action_record(res, sql_session, layout_create_action, user_id, layout_name,
+                logging.info('创建计划任务结束')
+                log_ad.refresh_driver()
+            except Exception as e:
+                log_ad.driver.save_screenshot(
+                    'user_id:{}_time_{}_plan_name:{}_plan_error.png'.format(user_id, datetime.now().strftime(
+                        "%Y-%m-%d, %H:%M:%S"), plan_name))
+                logging.error(str(e))
+                log_ad.refresh_driver()
+                res = {'sucess': False, 'result_info': str(e)}
+                sql_tools.action_record(res, sql_session, ad_plan_create_action, user_id, plan_name,
                                         action_record_table,
                                         service_name, wechat_name, task_name)
-            log_ad.refresh_driver()
-            # 3.创建计划
-            log_ad.select_ad_master(service_name, wechat_name)
-            plan_name = _['title']
-            # plan_typesetting_dict = sql_tools.get_ad_plan_typesetting(sql_session=sql_session, user_id=user_id,
-            #                                                           typesetting_name=plan_name)
-            create_ad_plan = CreateAdPlan(login_ad=log_ad, task=_, service_name=service_name, wechat_name=wechat_name)
-            res = create_ad_plan.run()
-            # 4.更新action_record相关计划信息
-            sql_tools.action_record(res, sql_session, ad_plan_create_action, user_id, plan_name, action_record_table,
-                                    service_name, wechat_name, task_name)
-            logging.info('创建计划任务结束')
-            log_ad.refresh_driver()
-            # 每次运行微信相关操作,对微信相关信息进行刷新
-            get_human_info(user_id, log_ad, db, cookie_canuse)
+                raise
+
+        # 每次运行微信相关操作,对微信相关信息进行刷新
+        get_human_info(user_id, log_ad, db, cookie_canuse, task_name)
+
+        check_task_in_hand(user_id, db, log_ad)
+
     except Exception as e:
-        log_ad.driver.save_screenshot(
-            'user_id:{}_time_{}_plan_error.png'.format(user_id, datetime.now().strftime("%Y-%m-%d, %H:%M:%S")))
-        raise
+        sql_tools.update_task_status_error(sql_session=sql_session, user_id=user_id, task_name=task_name)
         dingtalk.send_message('user_id:{} 计划执行出错,进行检查\n{}'.format(user_id, str(e)))
         logging.error(e)
+        raise
     finally:
         try:
             print('任务结束')
@@ -133,33 +106,24 @@ def carry_plan(user_id, ad_plan_list, log_ad, db, cookie_canuse):
             pass
 
 
-def get_human_info(user_id, log_ad, db, cookie_canuse):
-    try:
-        # 数据库
-        action_record_table = Table('action_record', db.metadata,
-                                    autoload=True, autoload_with=db.engine)
-        human_info_table = Table('human_info', db.metadata,
-                                 autoload=True, autoload_with=db.engine)
-        wechat_info_table = Table('wechat_info', db.metadata,
-                                  autoload=True, autoload_with=db.engine)
-        sql_session = db.DBSession()
-
-        # 行为记录
-        task_name = 'user_id: {user_id}  time:{time_sign} action:refresh_wechat_info'.format(user_id=user_id,
-                                                                                             time_sign=datetime.now().strftime(
-                                                                                                 "%Y-%m-%d, %H:%M:%S"))
-
-        action_type = 'refresh_wechat_info'
-        action_info = {'user_id': user_id, 'service_name': '', 'wechat_name': '',
-                       'action_type': action_type, 'object_name': '', 'task_name': task_name,
-                       'status': 'todo'}
-        record_insert = sql_tools.save_action_record(action_record_info=action_info,
-                                                     table_action_record=action_record_table)
-        sql_session.execute(record_insert)
-        sql_session.commit()
+def get_human_info(user_id, log_ad, db, cookie_canuse, task_name):
+    # 数据库
+    action_record_table = Table('action_record', db.metadata,
+                                autoload=True, autoload_with=db.engine)
+    human_info_table = Table('human_info', db.metadata,
+                             autoload=True, autoload_with=db.engine)
+    wechat_info_table = Table('wechat_info', db.metadata,
+                              autoload=True, autoload_with=db.engine)
+    sql_session = db.DBSession()
 
+    # 行为记录对应参数
+    object_name = ''
+    service_name = ''
+    wechat_name = ''
+    try:
         # 1.cookies保存
-        cookie_acion(db, log_ad, cookie_canuse, user_id)
+        if not cookie_canuse:
+            log_ad.cookies_save(log_ad)
 
         # wechat_info.每次都删除掉前面全部数据,进行更新
         # human_info 进行全局更新
@@ -169,9 +133,6 @@ def get_human_info(user_id, log_ad, db, cookie_canuse):
         if not res_info['sucess']:
             dingtalk.send_message('user_id:{} 获取微信数据为空,进行检查'.format(user_id))
             log_ad.driver.close()
-            object_name = ''
-            service_name = ''
-            wechat_name = ''
             sql_tools.action_record(res_info, sql_session, refresh_wechat_action, user_id,
                                     object_name, action_record_table,
                                     service_name, wechat_name, task_name)
@@ -215,21 +176,26 @@ def get_human_info(user_id, log_ad, db, cookie_canuse):
 
                 sql_session.execute(human_insert)
                 sql_session.commit()
-            print('update human info')
+            logging.info('update human info')
 
         # 浏览器关闭
         log_ad.driver.close()
-        object_name = ''
-        service_name = ''
-        wechat_name = ''
+
         sql_tools.action_record(res_info, sql_session, refresh_wechat_action,
                                 user_id, object_name, action_record_table,
                                 service_name, wechat_name, task_name)
+        check_task_in_hand(user_id=user_id, db=db, log_ad=log_ad)
     except Exception as e:
+        res_info = {'sucess': False, 'result_info': str(e)}
+        sql_tools.action_record(res_info, sql_session, refresh_wechat_action,
+                                user_id, object_name, action_record_table,
+                                service_name, wechat_name, task_name)
+
         log_ad.driver.save_screenshot(
             'user_id:{}_time_{}_wechat_info_error.png'.format(user_id, datetime.now().strftime("%Y-%m-%d, %H:%M:%S")))
         dingtalk.send_message('user_id:{} 获取微信数据出错,进行检查\n{}'.format(user_id, str(e)))
         logging.error(e)
+        raise
     finally:
         try:
             print('任务结束')
@@ -238,5 +204,26 @@ def get_human_info(user_id, log_ad, db, cookie_canuse):
             pass
 
 
+def check_task_in_hand(user_id, db, log_ad):
+    sql_session = db.DBSession()
+    lines = sql_tools.get_task_in_hand_limit_one(user_id=user_id, sql_session=sql_session)
+
+    task_name = ''
+    ad_plan_list = []
+    action_type = ''
+    for _ in lines:
+        typesetting, wechat_name, service_name, action_type, task_name = _
+        ad_plan_list.append(json.loads(typesetting))
+    if action_type == refresh_wechat_action:
+        # 刷新log_ad,以免log_ad超过生命周期
+        log_ad.driver.quit()
+        log_ad = LogIn(user_id=user_id)
+        get_human_info(user_id, log_ad, db, True, task_name)
+    if action_type == ad_plan_create_action:
+        log_ad.driver.quit()
+        log_ad = LogIn(user_id=user_id)
+        carry_plan(user_id, ad_plan_list, log_ad, db, True, task_name)
+
+
 if __name__ == "__main__":
     pass

+ 45 - 1
wechat_action/sql_tools.py

@@ -224,6 +224,50 @@ def get_undo_action(sql_session, user_id):
     return result_list
 
 
+def get_task_in_hand_num(user_id, sql_session):
+    sql = '''
+    select count(*) from 
+    action_record ar 
+    where timestampdiff(minute ,update_time ,current_timestamp )<60
+    and status ='todo' and user_id='{}'
+    '''.format(user_id)
+    cursor = sql_session.execute(sql)
+    lines = cursor.fetchall()
+    result = lines[0][0]
+    return result
+
+
+def get_task_in_hand_limit_one(user_id, sql_session):
+    sql = '''
+    select foo.typesetting,foo.wechat_name ,foo.service_name ,foo.action_type ,foo2.task_name from 
+    (select ad.typesetting,ar.wechat_name,ar.action_type ,ar.service_name ,
+    ar.task_name from action_record ar join ad_plan_typesetting ad
+    on ar.object_name = ad.name 
+    where action_type in ('create_ad_plan' ,'refresh_wechat_info')
+    and status ='todo'
+    and timestampdiff(minute ,ad.update_time ,current_timestamp )<60
+    and ad.user_id ='{}') as foo
+    join
+    (select task_name from action_record ar 
+    where status ='todo' and action_type in ('create_ad_plan' ,'refresh_wechat_info')
+    order by create_time 
+    limit 1) as foo2 on foo.task_name=foo2.task_name
+    '''.format(user_id)
+    cursor = sql_session.execute(sql)
+    lines = cursor.fetchall()
+    return lines
+
+
+def update_task_status_error(sql_session, user_id, task_name):
+    sql = '''
+    update action_record 
+    set status ='error'
+    where status ='todo' and user_id ='{}' and task_name ='{}'
+    '''.format(user_id, task_name)
+    sql_session.execute(sql)
+    sql_session.commit()
+
+
 def get_ad_task(sql_session, user_id):
     sql = '''
     select task_name,status,count(*),min(ar.create_time),apt.typesetting from action_record ar
@@ -265,7 +309,7 @@ def get_action_status(sql_session, user_id):
 
 
 def action_record(res, sql_session, action_type, user_id, object_name, action_table, service_name,
-                  wechat_name,task_name):
+                  wechat_name, task_name):
     print('get in action record ', service_name, wechat_name, res)
     status = 'done' if res['sucess'] else 'error'
     print(action_type)