ck 3 jaren geleden
bovenliggende
commit
dbe8f3566f

+ 78 - 63
app/api_data/platform_order/yuewen.py

@@ -2,6 +2,7 @@ import time
 from model.DateUtils import DateUtils
 from model.ComUtils import md5
 import requests
+from model.ComUtils import split_int
 from app.api_data.platform_order.order_util import save_order
 ut = DateUtils()
 
@@ -18,87 +19,101 @@ def get_yuewen_order_task(st, et, account):
         end_time = DateUtils.str_to_stamp(ut.get_n_minutes_ago(), "%Y-%m-%d %H:%M:%S")
     else:
         end_time = DateUtils.str_to_stamp(et) + 86399
-    page = 1
-    last_min_id = ''
-    last_max_id = ''
-    total_count = ''
-    last_page = ''
 
-    li = []
-    while True:
 
-        params = {
-            'email': email,
-            'timestamp': int(time.time()),
-            'start_time': start_time,
-            'end_time': end_time,
-            'page': page,
-            'version':1
-        }
+    for i in split_int(start_time,end_time,3600):
+        start = i[0]
+        end = i[1]
+        print(ut.stamp_to_str(end))
 
-        if page > 1:
-            params['last_min_id'] = last_min_id
-            params['last_max_id'] = last_max_id
-            params['total_count'] = total_count
-            params['last_page'] = last_page
 
-        sorted_data = sorted(params.items())
-        str_params = ''
-        for k, v in sorted_data:
-            str_params = str_params + str(k) + str(v)
+        page = 1
+        last_min_id = ''
+        last_max_id = ''
+        total_count = ''
+        last_page = ''
 
-        sign = md5(appsecert + str_params).upper()
 
-        # 放入签名
-        params['sign'] = sign
-        # print(params)
-        response_result_json = requests.get(url=url, params=params).json()
-        # print(response_result_json)
 
-        # print(len(response_result_json["data"]["list"]))
+        li = []
+        while True:
 
+            params = {
+                'email': email,
+                'timestamp': int(time.time()),
+                'start_time': start,
+                'end_time': end,
+                'page': page,
+                'version':1
+            }
 
+            if page > 1:
+                params['last_min_id'] = last_min_id
+                params['last_max_id'] = last_max_id
+                params['total_count'] = total_count
+                params['last_page'] = last_page
 
-        response_data = response_result_json['data']
-        total_count = response_data['total_count']
+            sorted_data = sorted(params.items())
+            str_params = ''
+            for k, v in sorted_data:
+                str_params = str_params + str(k) + str(v)
 
+            sign = md5(appsecert + str_params).upper()
 
-        last_min_id = response_data['min_id']
-        last_max_id = response_data['max_id']
-        last_page = response_data['page']
-        order_item_list = response_data['list']
+            # 放入签名
+            params['sign'] = sign
+            # print(params)
 
-        if len(order_item_list) == 0:
-            break
+            response_result_json = requests.get(url=url, params=params).json()
+            # print(response_result_json)
 
-        for i in order_item_list:
-            order_time = i["order_time"]
-            li.append((order_time[:10],
-                       '',
-                       '阅文',
-                       i['app_name'],
-                       i['channel_id'],
-                       i['openid'],
-                       i['order_time'],
-                       i['reg_time'],
-                       i['amount'],
-                       i['book_name'],
-                       i['order_id'],
-                       i['order_status']
+            # print(len(response_result_json["data"]["list"]))
 
-            ))
+            if not response_result_json.get('data'):
+                print(response_result_json)
+                raise
 
-        if len(order_item_list) < 100:
-            break
-        else:
-            page += 1
+            response_data = response_result_json['data']
+            total_count = response_data['total_count']
 
-    if len(li) > 0:
-        print(f"{email} 有订单{len(li)}")
-        save_order(li)
+
+            last_min_id = response_data['min_id']
+            last_max_id = response_data['max_id']
+            last_page = response_data['page']
+            order_item_list = response_data['list']
+
+            if len(order_item_list) == 0:
+                break
+
+            for i in order_item_list:
+                order_time = i["order_time"]
+                li.append((order_time[:10],
+                           '',
+                           '阅文',
+                           i['app_name'],
+                           i['channel_id'],
+                           i['openid'],
+                           i['order_time'],
+                           i['reg_time'],
+                           i['amount'],
+                           i['book_name'],
+                           i['order_id'],
+                           i['order_status']
+
+                ))
+
+            if len(order_item_list) < 100:
+                break
+            else:
+                page += 1
+
+        if len(li) > 0:
+            print(f"{email} 有订单{len(li)}")
+            save_order(li)
 
 
 
 if __name__ == '__main__':
-    a = "mqud82950@163.com,74ca754515fa253c8ab790603cebc2ee"
-    get_yuewen_order_task('2021-05-14', '2021-05-14', a.split(','))
+    a = "guangzhouliuqi2@sina.com,10ce1dd6ccb330a82b73701d1e78f518"
+    b = "mqud82950@163.com,74ca754515fa253c8ab790603cebc2ee"
+    get_yuewen_order_task('2021-05-28', '2021-05-28', a.split(','))

+ 46 - 0
app/etl/UserTags/UserTagsClean.py

@@ -0,0 +1,46 @@
+"""用户标签清洗"""
+from model.DataBaseUtils import CkUtils,MysqlUtils
+import pandas as pd
+import sys
+ck = CkUtils()
+db = MysqlUtils()
+
+def run():
+    sql = """
+        select channel,
+            user_id open_id,
+            platform platform_book_name,
+                count(1) recharge_count,
+               sum(amount) recharge_money,
+               max(order_time) last_recharge_time
+
+        from order where status=2 
+        group by channel,user_id,platform limit 100"""
+    col = ['channel','open_id','platform_book_name','recharge_count','recharge_money','last_recharge_time']
+
+    df = ck.getData_pd(sql,col=col)
+    print(df)
+
+    df_channel = get_qc_channel_info()
+
+    result = pd.merge(df,df_channel,how='left',on='channel')
+    print(result)
+
+    table = 't_mp_fans'
+    keys=['mp_id','open_id']
+    tags=['recharge_count','recharge_money','last_recharge_time','platform_book_name']
+
+    db.zx_test.dfUpdate2mysql(result,table,keys,tags)
+
+
+def get_qc_channel_info():
+    sql ="""select cast(id as char) mp_id,nick_name channel from t_mp_account"""
+    df = db.zx.getData_pd(sql)
+    return df
+    # print(df)
+
+
+
+if __name__ == '__main__':
+    run()
+    # get_qc_channel_info()

+ 0 - 0
app/etl/UserTags/__init__.py


+ 1 - 1
config/db_config.yaml

@@ -27,4 +27,4 @@ zx_test:
   host: 118.178.187.109
   user: root
   passwd: zxdev3306
-  db: zx-ads-test
+  db: zx-operation-test

+ 20 - 2
model/ComUtils.py

@@ -37,10 +37,28 @@ def split_list(li,range=2):
             break
     return data
 
+def split_int(st,et,range):
+    """:returns [(st1,et1),(st2,et2)..]"""
+    li = []
+    while True:
+        next_int = st+range-1
+        if next_int >= et:
+            li.append((st, et))
+            break
+        else:
+            li.append((st,next_int))
+
+            st = next_int+1
+    # print(li)
+
+    return li
+
+
 
 if __name__ == '__main__':
-    a=split_list([1,2,3,4,5,6,7,8,9,10],3)
+    # a=split_list([1,2,3,4,5,6,7,8,9,10],3)
+    # print(a)
+    a = split_int(1,10,3)
     print(a)
 
 
-

+ 39 - 0
model/DataBaseOperation.py

@@ -412,6 +412,45 @@ class MysqlOperation:
                          flag=flag,
                          split=split)
 
+
+    def updateManyV2(self, table, keys, tags, tag_values, key_values, flag=False, split=80):
+        if not isinstance(tag_values, (tuple, list, pd.DataFrame)):
+            log.error('Type Error')
+            exit(-1)
+            return
+        if len(tag_values) > split:
+            length = len(tag_values)
+            for i in range(0, length, split):
+                start, finish = i, i + split
+                self.updateManyV2(table, keys, tags, tag_values[start:finish], key_values[start:finish], flag, split=split)
+            return
+        if len(key_values) == 0 or len(tag_values) == 0:
+            log.debug('update 0 rows')
+            return
+        tag_values = self._convert_to_list(tag_values)
+        key_values = self._convert_to_list(key_values)
+        assert self._check_repeat_key(key_values) == False
+        exist_key_index = list(self._get_exist_keys_index(table, keys, key_values, flag))
+        update_keys = list(map(lambda x: key_values[x], exist_key_index))
+        update_tags = list(map(lambda x: tag_values[x], exist_key_index))
+
+        self.update_many(table=table,
+                         keys=keys,
+                         tags=tags,
+                         tag_values=update_tags,
+                         key_values=update_keys,
+                         flag=flag,
+                         split=split)
+
+    def dfUpdate2mysql(self, df, table, key, tag):
+        self.updateManyV2(
+            table=table,
+            keys=key,
+            tags=tag,
+            tag_values=df[tag],
+            key_values=df[key]
+        )
+
     def insertorupdatemany_v3(self, df, table, keys, tags, flag=False, split=80):
         self.insertorupdatemany_v2(
             table=table,

+ 6 - 0
model/DataBaseUtils.py

@@ -121,6 +121,12 @@ class CkUtils:
         return self.client.execute(sql)
 
     def getData_pd(self,sql,col):
+        """
+
+        :param sql:
+        :param col: ['a','b']
+        :return:
+        """
         data=self.execute(sql)
         df=pd.DataFrame(data,columns=col)
         return df