ck 4 年 前
コミット
ba9c1ab64d

+ 61 - 139
app/api_data/get_order.py

@@ -2,152 +2,74 @@ import requests
 import time
 import time
 import hashlib
 import hashlib
 import json
 import json
-from model.ComUtils import ComUtils
+from app.api_data.order_util import *
 from model.DataBaseUtils import MysqlUtils
 from model.DataBaseUtils import MysqlUtils
 from model.DateUtils import DateUtils
 from model.DateUtils import DateUtils
+from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
 db=MysqlUtils()
 db=MysqlUtils()
-class OrderAccount():
-
-    def get_account(self,plactform):
-        data = db.quchen_text.getData(f"select text from order_account_text where platform='{plactform}'")
-        new_data = []
-        for i in data:
-            new_data.append(i[0].replace('\n', '').split(","))
-        return new_data
-
-    def get_yg_acccount(self):
-        return self.get_account("阳光")
-
-    def get_wd_account(self):
-        return self.get_account("文鼎")
-
-
-class GetOrderData(ComUtils,OrderAccount):
-    def yg(self,start,end):
-        client_id = 10008097
-        token = '2xa1d55tTPBjeEA8Ho'
-        for i in self.get_yg_acccount():
-            stage = i[0]
-            vip_id = i[1]
-            print(vip_id)
-            self.get_yg_vip_channel(stage,vip_id,client_id,token)
-            self.get_yg_data(stage, vip_id, client_id, token,start,end)
-        self.yg_prase()
-
-    def yg_prase(self):
-
-        while True:
-
-            a = db.quchen_text.getOne("select count(1) from yangguang_path where update_time is null")
-            print(a)
-            if a==0:
-                break
-            time.sleep(60)
-
-
-        for i in self.get_yg_acccount():
-            vip_id = i[1]
-            self.parse_yg_data(vip_id)
-
-
-    def get_yg_data(self,stage,vip_id,client_id,token,start,end):
-
-        url = 'https://data.yifengaf.cn:443/channeldata/data/orders/list'
-        nonce=self.get_random_str()
-        timestamp=int(time.time())
-        signaure =self.sha1(str(token) + str(timestamp) + str(client_id) + str(nonce))
-        params = {
-            "client_id": client_id,
-            "token": token,
-            "nonce": nonce,
-            "timestamp": timestamp,
-            "signaure": signaure,
-            "vip_id": vip_id,
-            "start_time":start,   # %Y-%m-%d %H:%i:%s:
-            "end_time":end
-        }
-
-        headers={"Content-Type":"application/json"}
-        r=requests.post(url=url,data=json.dumps(params),headers=headers)
-        print(r.text)
-        task_id = json.loads(r.text).get("data").get("task_id")
-        db.quchen_text.execute(f"replace into yangguang_path(vip_id,task_id,stage,type) values ('{vip_id}','{task_id}','{stage}','order')")
-
-
-    def get_yg_vip_channel(self,stage,vip_id,client_id,token):
-        url='https://data.yifengaf.cn:443/channeldata/data/account/list'
-        nonce = self.get_random_str()
-        timestamp = int(time.time())
-        signaure = self.sha1(str(token) + str(timestamp) + str(client_id) + str(nonce))
-        params = {
-            "client_id": client_id,
-            "token": token,
-            "nonce": nonce,
-            "timestamp": timestamp,
-            "signaure": signaure,
-            "vip_id": vip_id,
-        }
-        headers = {"Content-Type": "application/json"}
-        r=requests.post(url=url,data=json.dumps(params),headers=headers)
-        print(r.text)
-        task_id= json.loads(r.text).get("data").get("task_id")
-        db.quchen_text.execute(f"replace into yangguang_path(vip_id,task_id,stage,type) values ('{vip_id}','{task_id}','{stage}','channel')")
-
-    def parse_yg_data(self,vip_id):
-        url=db.quchen_text.getOne(f"select path from yangguang_path where type='channel' and vip_id={vip_id} ")
-        r= requests.get(url).text
-        channel_di={}
-        a = r.split('}')
-        for i in a[:-1]:
-            if i[-1] != '}':
-                b=json.loads(i + "}", strict=False)
-
-            else:
-                b=json.loads(i, strict=False)
-            channel_di[b["channel_id"]]=b["wx_nickname"]
-        print(channel_di)
-
-        info=db.quchen_text.getData(f"select stage,path from yangguang_path where type='order' and vip_id={vip_id}")
-        stage=info[0][0]
-        path=info[0][1]
-        text=requests.get(path).text.replace('"referral_url":,','')
-
-        insert_data=[]
-        for j in text.split("}")[:-1]:
-            if j[-1] != '}':
-                j=j+'}'
-            try:
-                di=json.loads(j, strict=False)
-            except Exception as e:
-                print(j)
-                print(e)
-
-            if di["state"] == "未完成":
-                continue
-            platform = "阳光"
-            channel_id = di["channel_id"]
-            channel=channel_di[channel_id]
-
-            user_id = di["openid"]
-            order_time = di["create_time"]
-            reg_time = di["user_createtime"]
-            from_novel = di["book_name"]
-            amount = di["money"]
-            order_id = di["transaction_id"]
-            date = DateUtils.str_to_stamp(order_time[:10])
-            insert_data.append((date,stage,platform,channel,channel_id,user_id,order_time,reg_time,amount,from_novel,order_id))
-        # print(insert_data)
-        db.quchen_text.executeMany("replace into `order`(date,stage,platform,channel,channel_id,"
-                               "user_id,order_time,reg_time,amount,from_novel,order_id) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",tuple(insert_data))
+
+
+def get_account(plactform):
+    data = db.quchen_text.getData(f"select text from order_account_text where platform='{plactform}'")
+    new_data = []
+    for i in data:
+        new_data.append(i[0].replace('\n', '').split(","))
+    return new_data
+
+
+def yangguang(start=None, end=None):
+
+    if start:
+        start = start+' 00:00:00'
+        end = end+' 23:59:59'
+    else:
+        start = du.getTodayOrYestoday() + ' 00:00:00'
+        end = du.get_n_hours_ago(0)
+
+    client_id = 10008097
+    token = '2xa1d55tTPBjeEA8Ho'
+    accounts=get_account("阳光")
+
+    for i in accounts:
+        stage = i[0]
+        vip_id = i[1]
+        print(vip_id)
+        get_yg_vip_channel(stage, vip_id, client_id, token)
+        get_yg_data(stage, vip_id, client_id, token, start, end)
+
+    while True:
+        a = db.quchen_text.getOne("select count(1) from yangguang_path where update_time is null")
+        print(f" vip 待处理数量 {a} 正在等待数据回调")
+        if a == 0:
+            break
+        time.sleep(60)
+
+    for i in accounts:
+        vip_id = i[1]
+        parse_yg_data(vip_id)
+
+
+def huasheng(start=None,end=None):
+    if start is None:
+        start = end = du.getTodayOrYestoday()
+
+    executor = ThreadPoolExecutor(max_workers=5)
+    accounts = get_account("花生")
+    li = []
+    for account in accounts:
+        channel_data = get_hs_channel(account)
+        if not channel_data:
+            continue
+        for merchant in channel_data:
+            executor.submit(get_huasheng_order_task, start, end, account, merchant,li)
+    executor.shutdown(True)
+    save_hs_data(li)
 
 
 
 
 if __name__ == '__main__':
 if __name__ == '__main__':
-    a=GetOrderData()
-    a.yg('2020-11-01 00:00:00','2020-12-29 00:00:00')
-    # a.parse_yg_data(17382)
 
 
-    # a.get_yg_data('趣程15期','17382','10008097','2xa1d55tTPBjeEA8Ho','2020-11-27','2020-12-28')
-    # a.get_yg_vip_channel('趣程15期','17382','10008097','2xa1d55tTPBjeEA8Ho')
+    # yangguang('2021-01-16','2021-01-18')
+    huasheng('2021-01-18','2021-01-18')
 
 
 
 
     """要是只跑一个账号 把 get_yg_acccount() 里面的sql where 条件加上 id=xxx"""
     """要是只跑一个账号 把 get_yg_acccount() 里面的sql where 条件加上 id=xxx"""

+ 5 - 6
app/api_data/order_hourly.py

@@ -1,4 +1,4 @@
-from app.api_data.get_order import GetOrderData
+from app.api_data.get_order import *
 from model.DateUtils import DateUtils
 from model.DateUtils import DateUtils
 import time
 import time
 du=DateUtils()
 du=DateUtils()
@@ -6,12 +6,11 @@ du=DateUtils()
 """每小时5分跑"""
 """每小时5分跑"""
 
 
 if __name__ == '__main__':
 if __name__ == '__main__':
-    start=du.getTodayOrYestoday()+ ' 00:00:00'
-    end=du.get_n_hours_ago(0)
-    print(start,end)
+    yangguang()
+    huasheng()
+
+
 
 
-    a=GetOrderData()
-    a.yg(start,end)
 
 
 
 
 
 

+ 201 - 0
app/api_data/order_util.py

@@ -0,0 +1,201 @@
+import time
+from model import ComUtils
+import requests
+import json
+from model.DataBaseUtils import MysqlUtils
+from model.DateUtils import DateUtils
+from model.ComUtils import *
+import math
+from model.DateUtils import DateUtils
+db=MysqlUtils()
+du=DateUtils()
+
+
+def get_yg_vip_channel(stage, vip_id, client_id, token):
+    url = "https://data.yifengaf.cn:443/channeldata/data/account/list"
+    nonce = ComUtils.get_random_str()
+    timestamp = int(time.time())
+    signaure = ComUtils.sha1(str(token) + str(timestamp) + str(client_id) + str(nonce))
+    params = {
+        "client_id": client_id,
+        "token": token,
+        "nonce": nonce,
+        "timestamp": timestamp,
+        "signaure": signaure,
+        "vip_id": vip_id,
+    }
+    headers = {"Content-Type": "application/json"}
+    r = requests.post(url=url, data=json.dumps(params), headers=headers)
+    print(r.text)
+    task_id = json.loads(r.text).get("data").get("task_id")
+    db.quchen_text.execute(
+        f"replace into yangguang_path(vip_id,task_id,stage,type) values ('{vip_id}','{task_id}','{stage}','channel')")
+
+
+def get_yg_data(stage,vip_id,client_id,token,start,end):
+    url = "https://data.yifengaf.cn:443/channeldata/data/orders/list"
+    nonce=ComUtils.get_random_str()
+    timestamp=int(time.time())
+    signaure =ComUtils.sha1(str(token) + str(timestamp) + str(client_id) + str(nonce))
+    params = {
+        "client_id": client_id,
+        "token": token,
+        "nonce": nonce,
+        "timestamp": timestamp,
+        "signaure": signaure,
+        "vip_id": vip_id,
+        "start_time":start,   # %Y-%m-%d %H:%i:%s:
+        "end_time":end
+    }
+    headers={"Content-Type":"application/json"}
+    r=requests.post(url=url,data=json.dumps(params),headers=headers)
+    print(r.text)
+    task_id = json.loads(r.text).get("data").get("task_id")
+    db.quchen_text.execute(f"replace into yangguang_path(vip_id,task_id,stage,type) values ('{vip_id}','{task_id}','{stage}','order')")
+
+
+def parse_yg_data(vip_id):
+    url = db.quchen_text.getOne(f"select path from yangguang_path where type='channel' and vip_id={vip_id} ")
+    r = requests.get(url).text
+    channel_di={}
+    a = r.split('}')
+    for i in a[:-1]:
+        if i[-1] != '}':
+            b=json.loads(i + "}", strict=False)
+
+        else:
+            b=json.loads(i, strict=False)
+        channel_di[b["channel_id"]]=b["wx_nickname"]
+    print(channel_di)
+
+    info=db.quchen_text.getData(f"select stage,path from yangguang_path where type='order' and vip_id={vip_id}")
+    stage=info[0][0]
+    path=info[0][1]
+    text=requests.get(path).text.replace('"referral_url":,','')
+
+    insert_data=[]
+    for j in text.split("}")[:-1]:
+        if j[-1] != '}':
+            j=j+'}'
+        try:
+            di=json.loads(j, strict=False)
+        except Exception as e:
+            print(j)
+            print(e)
+
+        if di["state"] == "未完成":
+            continue
+        platform = "阳光"
+        channel_id = di["channel_id"]
+        channel=channel_di[channel_id]
+
+        user_id = di["openid"]
+        order_time = di["create_time"]
+        reg_time = di["user_createtime"]
+        from_novel = di["book_name"]
+        amount = di["money"]
+        order_id = di["transaction_id"]
+        date = DateUtils.str_to_stamp(order_time[:10])
+        insert_data.append((date,stage,platform,channel,channel_id,user_id,order_time,reg_time,amount,from_novel,order_id))
+    # print(insert_data)
+    db.quchen_text.executeMany("replace into `order`(date,stage,platform,channel,channel_id,"
+                           "user_id,order_time,reg_time,amount,from_novel,order_id) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",tuple(insert_data))
+
+
+def get_hs_channel(account):
+    url = 'https://vip.rlcps.cn/api/getMerchants'
+    apiKey = str(account[0])
+    apiSecurity = account[1]
+    timestamp = str(int(time.time()))
+    sign = md5(apiKey + timestamp + apiSecurity).upper()
+    params = {
+        'apiKey': apiKey,
+        'apiSecurity': apiSecurity,
+        'timestamp': timestamp,
+        'sign': sign
+    }
+    response_result_json = requests.post(url, params).json()
+    if 'data' not in response_result_json.keys():
+        print('花生账号【{apiKey}】本次请求数据异常,响应报文【{result}】'.format(apiKey=apiKey, result=response_result_json))
+        return
+    return response_result_json['data']
+
+def get_huasheng_order_task(start,end, account, merchant,li):
+
+    apiKey = str(account[0])
+    apiSecurity = account[1]
+    stage = account[2]
+    timestamp = str(int(time.time()))
+
+    order_url = 'https://vip.rlcps.cn/api/orderList'
+    merchant_id = merchant['merchant_id']
+    merchant_name = merchant['merchant_name']
+    limit = 500
+
+    for date in du.getDateLists(start,end):
+
+        page = 1
+        while True:
+            sign = md5(apiKey + date + str(merchant_id) + timestamp + apiSecurity).upper()
+            order_params = {
+                'apiKey': apiKey,
+                'apiSecurity': apiSecurity,
+                'timestamp': timestamp,
+                'date': date,
+                'merchant_id': merchant_id,
+                'sign': sign,
+                'page': page,
+                'limit': limit
+            }
+            r = requests.post(order_url, order_params)
+            # print(r.text)
+            response_result_json = r.json()
+
+            if 'data' not in response_result_json.keys():
+                print('花生账号【{key}】, 查询时间【{date}】, 渠道【{merchant_id}:{merchant_name}】本次请求数据异常,响应报文【{result}】'
+                      .format(key=apiKey, date=date, merchant_id=merchant_id, merchant_name=merchant_name,
+                              result=response_result_json))
+                break
+
+            if len(response_result_json['data']) == 0:
+                break
+
+            total_count = response_result_json['count']
+            order_item_list = response_result_json['data']
+
+            for order_item in order_item_list:
+                if order_item['order_status'] == 1:  # 1为已支付
+                    order = {}
+                    order['user_id'] = order_item['openid']
+                    order['order_id'] = order_item['trans_id']
+                    order['order_time'] = order_item['pay_at']
+                    order['reg_time'] = order_item['join_at']
+                    order['channel'] = merchant_name
+                    order['channel_id'] = merchant_id
+                    order['platform'] = '花生'
+                    order['stage'] = stage
+                    order['from_novel'] = order_item['book_name']
+                    order['amount'] = order_item['amount']
+                    order["date"]=int(time.mktime(time.strptime(order_item['pay_at'][:10],"%Y-%m-%d")))
+                    order = sorted(order.items(), key=lambda item: item[0])
+                    order = dict(order)
+                    order = tuple(order.values())
+                    if order.__len__()>0:
+                        li.append(order)
+
+            if int(page) >= math.ceil(total_count / int(limit)):
+                break
+            page = page + 1
+
+
+def save_hs_data(data):
+    sql = 'replace INTO quchen_text.`order` ' \
+          '(amount,channel,channel_id,date,from_novel,order_id,order_time,platform,reg_time,stage,user_id)' \
+          ' VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);'
+    db.quchen_text.executeMany(sql,data)
+
+
+if __name__ == '__main__':
+    print(du.str_to_stamp('2021-01-18'))
+    a = time.strftime("%Y-%m-%d", time.localtime(1610899200))
+    print(a)

+ 17 - 15
model/ComUtils.py

@@ -1,21 +1,23 @@
 import hashlib
 import hashlib
 import random
 import random
 
 
-class ComUtils:
-    def md5(self,s):
-        md5 = hashlib.md5()
-        md5.update(s.encode("utf-8"))
-        return md5.hexdigest()
 
 
-    def sha1(self,s):
-        sha1 = hashlib.sha1()
-        sha1.update(s.encode("utf-8"))
-        return sha1.hexdigest()
+def md5(s):
+    md5 = hashlib.md5()
+    md5.update(s.encode("utf-8"))
+    return md5.hexdigest()
 
 
-    def get_random_str(self,num=5):
-        H = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
-        salt = ''
-        for i in range(num):
-            salt += random.choice(H)
-        return salt
+
+def sha1(s):
+    sha1 = hashlib.sha1()
+    sha1.update(s.encode("utf-8"))
+    return sha1.hexdigest()
+
+
+def get_random_str(num=5):
+    H = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
+    salt = ''
+    for i in range(num):
+        salt += random.choice(H)
+    return salt
 
 

+ 6 - 3
model/DataBaseOperation.py

@@ -67,13 +67,16 @@ class MysqlOperation:
 
 
 
 
 
 
-    def execute(self, sql):
+    def execute(self, sql,data=None):
         start = time.time()
         start = time.time()
-        self.cursor.execute(sql)
+        if data:
+            self.cursor.execute(sql,data)
+        else:
+            self.cursor.execute(sql)
         self.conn.commit()
         self.conn.commit()
         if MYSQL_DEBUG:
         if MYSQL_DEBUG:
 
 
-            log.info('sql: \n' + sql)
+            # log.info('sql: \n' + sql)
             log.info('sql cost: %s' % (time.time() - start))
             log.info('sql cost: %s' % (time.time() - start))
 
 
     def executeMany(self,sql,data):
     def executeMany(self,sql,data):