Selaa lähdekoodia

MOD:素材库整改

cxyu 3 vuotta sitten
vanhempi
commit
8f39ee5807

+ 51 - 64
app/api_data/platform_order/order_util.py

@@ -10,13 +10,10 @@ from model.DateUtils import DateUtils
 import logging
 from urllib import parse
 from model.DingTalkUtils import DingTalkUtils
-# logging.getLogger().setLevel(logging.WARNING)
-db=MysqlUtils()
-du=DateUtils()
-
-
-
 
+# logging.getLogger().setLevel(logging.WARNING)
+db = MysqlUtils()
+du = DateUtils()
 
 
 def get_hs_order_task(start, end, account):
@@ -50,10 +47,8 @@ def get_hs_order_task(start, end, account):
         save_order(li)
 
 
-
-
-def get_huasheng_order(start,end, account, merchant):
-    li =[]
+def get_huasheng_order(start, end, account, merchant):
+    li = []
     apiKey = str(account[0])
     apiSecurity = account[1]
     stage = account[2]
@@ -64,7 +59,7 @@ def get_huasheng_order(start,end, account, merchant):
     merchant_name = merchant['merchant_name']
     limit = 500
 
-    for date in du.getDateLists(start,end):
+    for date in du.getDateLists(start, end):
 
         page = 1
         while True:
@@ -81,10 +76,9 @@ def get_huasheng_order(start,end, account, merchant):
             }
             r = requests.post(order_url, order_params)
             response_result_json = r.json()
-            if response_result_json['code']!=0:
+            if response_result_json['code'] != 0:
                 print(response_result_json)
-                DingTalkUtils().send('花生订单接口异常'+r.text)
-
+                DingTalkUtils().send('花生订单接口异常' + r.text)
 
             if 'data' not in response_result_json.keys():
                 print('花生账号【{key}】, 查询时间【{date}】, 渠道【{merchant_id}:{merchant_name}】本次请求数据异常,响应报文【{result}】'
@@ -128,12 +122,9 @@ def get_huasheng_order(start,end, account, merchant):
 def save_hs_data(data):
     sql = 'replace INTO quchen_text.ods_order ' \
           ' VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);'
-    db.quchen_text.executeMany(sql,data)
-
-
-
-        # print(order_list)
+    db.quchen_text.executeMany(sql, data)
 
+    # print(order_list)
 
 
 def save_order(order_list):
@@ -142,16 +133,15 @@ def save_order(order_list):
                                values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""", order_list)
     print("入库成功")
 
-def save_order2(order_list):
-
-
 
+def save_order2(order_list):
     db.quchen_text.executeMany("""replace into ods_order(date,stage,platform,channel,channel_id,user_id,
                                order_time,reg_time,amount,from_novel,order_id,status,
                                platform_user_id,wechat_app_id,book_tags,order_type,trade_no,transaction_no) 
                                values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""", order_list)
     print("入库成功")
 
+
 def get_wd_account_siteid_list(account):
     url = 'https://bi.reading.163.com/dist-api/siteList'
 
@@ -196,12 +186,12 @@ def get_wd_account_siteid_list(account):
     return mpid_list
 
 
-def get_wending_json_object(url,params):
-    params['timestamp'] = int(time.time()*1000)
-    sorted_data = sorted(params.items(),reverse = False)
-    s=""
-    for k,v in sorted_data:
-      s = s+str(k)+"="+str(v)
+def get_wending_json_object(url, params):
+    params['timestamp'] = int(time.time() * 1000)
+    sorted_data = sorted(params.items(), reverse=False)
+    s = ""
+    for k, v in sorted_data:
+        s = s + str(k) + "=" + str(v)
     sign = md5(s).lower()
     params['sign'] = sign
 
@@ -214,26 +204,27 @@ def get_wending_json_object(url,params):
     endtime = params['endtime']
     page = params['page']
     ## +'&secretkey='+str(secretkey)
-    parameter = 'consumerkey='+str(consumerkey)+'&timestamp='+str(timestamp)+'&siteid='+str(siteid)+'&pageSize='+str(pageSize)\
-              +'&starttime='+str(starttime)+'&endtime='+str(endtime)+'&page='+str(page)+'&sign='+str(sign)
+    parameter = 'consumerkey=' + str(consumerkey) + '&timestamp=' + str(timestamp) + '&siteid=' + str(
+        siteid) + '&pageSize=' + str(pageSize) \
+                + '&starttime=' + str(starttime) + '&endtime=' + str(endtime) + '&page=' + str(page) + '&sign=' + str(
+        sign)
     global get_url
-    get_url = url+"?"+parameter
-
+    get_url = url + "?" + parameter
 
     while True:
-        r= requests.get(url=get_url)
-        if r.status_code==200:
+        r = requests.get(url=get_url)
+        if r.status_code == 200:
             break
         else:
             time.sleep(1)
             print("请求连接出错,等待1s...")
 
-    response_result_json=r.json()
+    response_result_json = r.json()
     del params['sign']
     return response_result_json
 
 
-def get_wd_order_task(start,end,account):
+def get_wd_order_task(start, end, account):
     order_list = []
     url = 'https://bi.reading.163.com/dist-api/rechargeList'
     consumerkey = account[0]
@@ -245,8 +236,8 @@ def get_wd_order_task(start,end,account):
     if len(siteid_list) == 0:
         siteid_list.append(siteid)
 
-    starttime = du.date_str_to_str(start)+'0000'
-    endtime = du.date_str_to_str(end)+'2359'
+    starttime = du.date_str_to_str(start) + '0000'
+    endtime = du.date_str_to_str(end) + '2359'
 
     for siteid in siteid_list:
 
@@ -268,7 +259,6 @@ def get_wd_order_task(start,end,account):
             order_item_list = response_result_json['data']['rechargeList']
 
             for x in order_item_list:
-
                 order_time = DateUtils.stamp_to_str(x['createTime'])
                 reg_time = DateUtils.stamp_to_str(x['userRegisterTime'])
 
@@ -286,19 +276,19 @@ def get_wd_order_task(start,end,account):
                      x['ewTradeId'] if x.get('ewTradeId') else x['rechargeUuid'],
                      2 if x['payStatus'] == 1 else 1
                      # ,x['userId']
-                    )
+                     )
                 )
             if len(order_item_list) < 1000:
                 break
             else:
                 page += 1
     print(f"{stage} [{start}~{end}] 有订单 {order_list.__len__()}")
-    if order_list.__len__()>0:
+    if order_list.__len__() > 0:
         # print(order_list)
         save_order(order_list)
 
 
-def get_gf_order_task(start,end,account):
+def get_gf_order_task(start, end, account):
     order_list = []
     url = 'https://bi.reading.163.com/dist-api/rechargeList'
     consumerkey = account[0]
@@ -310,8 +300,8 @@ def get_gf_order_task(start,end,account):
     if len(siteid_list) == 0:
         siteid_list.append(siteid)
 
-    starttime = du.date_str_to_str(start)+'0000'
-    endtime = du.date_str_to_str(end)+'2359'
+    starttime = du.date_str_to_str(start) + '0000'
+    endtime = du.date_str_to_str(end) + '2359'
 
     for siteid in siteid_list:
 
@@ -329,11 +319,10 @@ def get_gf_order_task(start,end,account):
 
             response_result_json = get_wending_json_object(url, params)
             # print(response_result_json)
-
+            print(response_result_json)
             order_item_list = response_result_json['data']['rechargeList']
-
+            print(order_item_list)
             for x in order_item_list:
-
                 order_time = DateUtils.stamp_to_str(x['createTime'])
                 reg_time = DateUtils.stamp_to_str(x['userRegisterTime'])
 
@@ -351,20 +340,23 @@ def get_gf_order_task(start,end,account):
                      x['ewTradeId'] if x.get('ewTradeId') else x['rechargeUuid'],
                      2 if x['payStatus'] == 1 else 1
                      # ,x['userId']
-                    )
+                     )
                 )
             if len(order_item_list) < 1000:
                 break
             else:
                 page += 1
     print(f"{stage} [{start}~{end}] 有订单 {order_list.__len__()}")
-    if order_list.__len__()>0:
+    if order_list.__len__() > 0:
         print(order_list)
+        sum = 0
+        for _ in order_list:
+            sum = sum + _[8]
+        print(sum)
         save_order(order_list)
 
 
-def get_zd_order_task(start,end,account):
-
+def get_zd_order_task(start, end, account):
     """开始到结束最多90天"""
     order_list = []
     url = 'https://api.zhangdu520.com/channel/getorder'
@@ -377,8 +369,8 @@ def get_zd_order_task(start,end,account):
     sign = md5(str(uid) + '&' + appsecert + '&' + str(timestamp))
 
     for i in du.split_date2(start, end, 90):
-        starttime = DateUtils.str_to_stamp(i[0]+' 00:00:00','%Y-%m-%d %H:%M:%S')
-        endtime = DateUtils.str_to_stamp(i[1]+' 23:59:59','%Y-%m-%d %H:%M:%S')
+        starttime = DateUtils.str_to_stamp(i[0] + ' 00:00:00', '%Y-%m-%d %H:%M:%S')
+        endtime = DateUtils.str_to_stamp(i[1] + ' 23:59:59', '%Y-%m-%d %H:%M:%S')
         page = 1
         while True:
             params = {
@@ -402,7 +394,6 @@ def get_zd_order_task(start,end,account):
 
             order_item_list = result_data['list']
             for i in order_item_list:
-
                 order_time = DateUtils.stamp_to_str(i['ctime'])
                 reg_time = DateUtils.stamp_to_str(i['regtime'])
                 order_list.append((
@@ -433,7 +424,6 @@ def get_zd_order_task(start,end,account):
 
 
 def get_zzy_order_task(start, end, account):
-
     url = 'https://inovel.818tu.com/partners/channel/channels/list?'
     key = account[0]
     secert = account[1]
@@ -460,9 +450,8 @@ def get_zzy_order_task(start, end, account):
 
 
 def get_zzy_channel_order(start, end, account, channel):
-
-    get_time=DateUtils.str_to_date_str(start, f2="%Y-%m-%dT%H:%M:%S+08:00")
-    limit_time=DateUtils.str_to_date_str(end, f2="%Y-%m-%dT%H:%M:%S+08:00")
+    get_time = DateUtils.str_to_date_str(start, f2="%Y-%m-%dT%H:%M:%S+08:00")
+    limit_time = DateUtils.str_to_date_str(end, f2="%Y-%m-%dT%H:%M:%S+08:00")
     order_list = []
     key = account[0]
     secert = account[1]
@@ -502,10 +491,9 @@ def get_zzy_channel_order(start, end, account, channel):
         order_item_list = response_result_json['data']['items']  # 订单列表
 
         for i in order_item_list:
-
-
             order_time = DateUtils.str_to_date_str(i['created_at'], "%Y-%m-%dT%H:%M:%S+08:00", "%Y-%m-%d %H:%M:%S")
-            reg_time = DateUtils.str_to_date_str(i['member']['created_at'], "%Y-%m-%dT%H:%M:%S+08:00", "%Y-%m-%d %H:%M:%S")
+            reg_time = DateUtils.str_to_date_str(i['member']['created_at'], "%Y-%m-%dT%H:%M:%S+08:00",
+                                                 "%Y-%m-%d %H:%M:%S")
 
             order_list.append((
                 order_time[:10],
@@ -523,7 +511,6 @@ def get_zzy_channel_order(start, end, account, channel):
                 # ,i['id']
             ))
 
-
         if int(page) >= math.ceil(total_count / int(per_page)):
             break
         page = int(page) + 1
@@ -548,7 +535,7 @@ if __name__ == '__main__':
     # print(DateUtils.stamp_to_str(1612155476,'%Y-%m-%d %H:%M:%S')[:10])
     # exit(0)
 
-    st= et = '2021-05-07'
+    st = et = '2021-05-07'
 
     account = "62140324,KUUxPIokqtIrtvHQ,1025010,趣程19期,qucheng19qi@163.com"
-    get_wd_order_task(st,et,account.split(','))
+    get_wd_order_task(st, et, account.split(','))

+ 131 - 98
app/api_data/tx_ad_cost/cost_util.py

@@ -3,6 +3,7 @@ import requests
 import time
 import pymysql
 import logging
+import pandas
 from concurrent.futures import ThreadPoolExecutor
 from model.DataBaseUtils import MysqlUtils
 from model.ComUtils import *
@@ -77,7 +78,7 @@ def get_campaign(account_id, access_token, flag, campaign_ids, dt):
 
 
 def get_adcreatives(account_id, access_token, flag, adc_ids, dt):  # 获取创意
-    # 接口https://developers.e.qq.com/docs/api/adsmanagement/adcreatives/adcreatives_get?version=1.3
+    # 接口 https://developers.e.qq.com/docs/api/adsmanagement/adcreatives/adcreatives_get?version=1.3
     url = 'https://api.e.qq.com/v1.1/adcreatives/get'
     li = []
     page = 1
@@ -191,7 +192,17 @@ def get_adcreatives(account_id, access_token, flag, adc_ids, dt):  # 获取创
 
 
 def images_info_get(account_id, access_token, image_ids):  # 获取图片信息
-    # 接口https://developers.e.qq.com/docs/api/business_assets/image/images_get?version=1.3
+    # 接口 https://developers.e.qq.com/docs/api/business_assets/image/images_get?version=1.3
+    # 1.更新数据
+    id_content = ','.join([''' '{}' '''.format(i) for i in image_ids.split(',')])
+    id_content = id_content[:-1]
+    sql = ''' select image_id from image_info vi 
+                       where image_id  in ({});'''.format(id_content)
+    rs = db.quchen_text.getData(sql)
+    id_all_set = set([i for i in image_ids.split(',') if len(i) > 0])
+    id_have = set([i[0] for i in rs])
+    image_ids = id_all_set - id_have
+
     fields = ('image_id', 'width', 'height', 'file_size', 'signature', 'preview_url')
     interface = 'images/get'
     url = 'https://api.e.qq.com/v1.3/' + interface
@@ -199,123 +210,137 @@ def images_info_get(account_id, access_token, image_ids):  # 获取图片信息
     page = 1
     li = []
 
-    while True:
-
-        common_parameters = {
-            'access_token': access_token,
-            'timestamp': int(time.time()),
-            'nonce': str(time.time()) + str(random.randint(0, 999999)),
-            'fields': fields
-        }
+    for image_id in image_ids:
+        if len(image_id) < 1:
+            continue
+        while True:
 
-        parameters = {
-            "account_id": account_id,
-            "filtering": [{
-                "field": "image_id",
-                "operator": "IN",
-                "values": image_ids.split(',')
+            common_parameters = {
+                'access_token': access_token,
+                'timestamp': int(time.time()),
+                'nonce': str(time.time()) + str(random.randint(0, 999999)),
+                'fields': fields
+            }
 
-            }],
-            "page": page,
-            "page_size": 100
-        }
+            parameters = {
+                "account_id": account_id,
+                "filtering": [{
+                    "field": "image_id",
+                    "operator": "IN",
+                    "values": [image_id]
 
-        parameters.update(common_parameters)
-        for k in parameters:
-            if type(parameters[k]) is not str:
-                parameters[k] = json.dumps(parameters[k])
+                }],
+                "page": page,
+                "page_size": 100
+            }
 
-        while True:
-            h = requests.get(url, params=parameters)
-            # logging.info(h.text)
-            if h.status_code == 200:
-                r = h.json()
-                break
-            else:
-                time.sleep(1)
-                logging.info("请求出错 等待1s..")
+            parameters.update(common_parameters)
+            for k in parameters:
+                if type(parameters[k]) is not str:
+                    parameters[k] = json.dumps(parameters[k])
+
+            while True:
+                h = requests.get(url, params=parameters)
+                # logging.info(h.text)
+                if h.status_code == 200:
+                    r = h.json()
+                    break
+                else:
+                    time.sleep(1)
+                    logging.info("请求出错 等待1s..")
 
-        if 'data' in r.keys():
-            li.extend(r['data']['list'])
+            if 'data' in r.keys():
+                li.extend(r['data']['list'])
 
-        total_page = r['data']['page_info']['total_page']
-        if total_page > page:
-            page += 1
-        else:
-            break
-    # logging.info(li)
+            total_page = r['data']['page_info']['total_page']
+            if total_page > page:
+                page += 1
+            else:
+                break
     data = []
     for i in li:
-        data.append((i['image_id'], i['width'], i['height'], i['signature'], i['preview_url']))
-    # logging.info(data)
-    logging.info(f"{account_id} 有图片:" + str(li.__len__()))
+        data.append(
+            (i['image_id'], i['width'], i['height'], i['signature'], i['preview_url'], i['file_size']))
+    logging.info(f"{account_id} 有图片:" + str(li.__len__()))
     if li.__len__() > 0:
-        sql = "replace into image_info value (%s,%s,%s,%s,%s)"
+        sql = "insert IGNORE into image_info (image_id,width,height,signature,preview_url,size) value (%s,%s,%s,%s,%s,%s)"
         db.quchen_text.executeMany(sql, data)
         db.close()
 
 
 def video_info_get(account_id, access_token, image_ids):  # 获取视频信息
-    # 接口https://developers.e.qq.com/docs/api/business_assets/video/videos_get?version=1.3
+    # 接口 https://developers.e.qq.com/docs/api/business_assets/video/videos_get?version=1.3
+
+    # 1.数据库获取,查看是否需要获取对应数据
+    id_content = ','.join([''' '{}' '''.format(i) for i in image_ids.split(',')])
+    id_content = id_content[:-1]
+    sql = ''' select video_id from video_info vi 
+                   where video_id  in ({});'''.format(id_content)
+    rs = db.quchen_text.getData(sql)
+    id_all_set = set([i for i in image_ids.split(',') if len(i) > 0])
+    id_have = set([i[0] for i in rs])
+    image_ids = id_all_set - id_have
+
+    # 2.获取对应数据
     fields = ('video_id', 'width', 'height', 'file_size', 'signature', 'preview_url')
     interface = 'videos/get'
     url = 'https://api.e.qq.com/v1.3/' + interface
 
     page = 1
     li = []
+    for image_id in image_ids:
+        if len(image_id) < 1:
+            continue
+        while True:
 
-    while True:
-
-        common_parameters = {
-            'access_token': access_token,
-            'timestamp': int(time.time()),
-            'nonce': str(time.time()) + str(random.randint(0, 999999)),
-            'fields': fields
-        }
-
-        parameters = {
-            "account_id": account_id,
-            "filtering": [{
-                "field": "media_id",
-                "operator": "IN",
-                "values": image_ids.split(',')
+            common_parameters = {
+                'access_token': access_token,
+                'timestamp': int(time.time()),
+                'nonce': str(time.time()) + str(random.randint(0, 999999)),
+                'fields': fields
+            }
+            parameters = {
+                "account_id": account_id,
+                "filtering": [{
+                    "field": "media_id",
+                    "operator": "IN",
+                    "values": [image_id]
+
+                }],
+                "page": page,
+                "page_size": 100
+            }
 
-            }],
-            "page": page,
-            "page_size": 100
-        }
+            parameters.update(common_parameters)
+            for k in parameters:
+                if type(parameters[k]) is not str:
+                    parameters[k] = json.dumps(parameters[k])
+
+            while True:
+                h = requests.get(url, params=parameters)
+                # logging.info(h.text)
+                if h.status_code == 200:
+                    r = h.json()
+                    break
+                else:
+                    time.sleep(1)
+                    logging.info("请求出错 等待1s..")
 
-        parameters.update(common_parameters)
-        for k in parameters:
-            if type(parameters[k]) is not str:
-                parameters[k] = json.dumps(parameters[k])
+            if 'data' in r.keys():
+                li.extend(r['data']['list'])
 
-        while True:
-            h = requests.get(url, params=parameters)
-            # logging.info(h.text)
-            if h.status_code == 200:
-                r = h.json()
-                break
+            total_page = r['data']['page_info']['total_page']
+            if total_page > page:
+                page += 1
             else:
-                time.sleep(1)
-                logging.info("请求出错 等待1s..")
-
-        if 'data' in r.keys():
-            li.extend(r['data']['list'])
-
-        total_page = r['data']['page_info']['total_page']
-        if total_page > page:
-            page += 1
-        else:
-            break
-    # logging.info(li)
+                break
     data = []
     for i in li:
-        data.append((i['video_id'], i['width'], i['height'], i['signature'], i['preview_url']))
-    # logging.info(data)
-    logging.info(f"{account_id} 视频:" + str(li.__len__()))
+        data.append((i['video_id'], i['width'], i['height'], i['signature'],
+                     i['preview_url'], i['file_size']))
+    logging.info(f"{account_id} 获取到新视频:" + str(li.__len__()))
     if li.__len__() > 0:
-        sql = "replace into video_info value (%s,%s,%s,%s,%s)"
+        sql = "insert IGNORE into video_info (video_id,width,height,signature,preview_url,size) value (%s,%s,%s,%s,%s,%s)"
         db.quchen_text.executeMany(sql, data)
         db.close()
 
@@ -411,7 +436,7 @@ def get_ad_cost_day(account_id, access_token, flag, st, et):
 
 
 def ad_cost_day_gdt(account_id, access_token, st, et):
-    # 接口文档https://developers.e.qq.com/docs/api/insights/ad_insights/daily_reports_get?version=1.3
+    # 接口文档 https://developers.e.qq.com/docs/api/insights/ad_insights/daily_reports_get?version=1.3
     url = 'https://api.e.qq.com/v1.3/daily_reports/get'
     fields = (
         'date', 'ad_id', 'adgroup_id', 'cost', 'view_count', 'ctr', 'follow_count', 'web_order_count', 'order_amount')
@@ -437,9 +462,9 @@ def ad_cost_day_gdt(account_id, access_token, st, et):
         for k in parameters:
             if type(parameters[k]) is not str:
                 parameters[k] = json.dumps(parameters[k])
-
         while True:
-            r = requests.get(url, params=parameters).json()
+            r = requests.get(url, params=parameters)
+            r = r.json()
             # logging.info(r)
             code = r['code']
             if code == 11017:
@@ -498,7 +523,8 @@ def ad_cost_day_mp(account_id, access_token, st, et):
                 parameters[k] = json.dumps(parameters[k])
 
         while True:
-            r = requests.get(url, params=parameters).json()
+            r = requests.get(url, params=parameters)
+            r = r.json()
             # logging.info(r['data']['list'])
             # import pandas as pd
             # logging.info(pd.DataFrame(r['data']['list']))
@@ -518,7 +544,6 @@ def ad_cost_day_mp(account_id, access_token, st, et):
                             i['valid_click_count'],
                             i['official_account_follow_count'], i['order_count'], i['order_amount'] / 100, account_id,
                             'MP'
-
                         )
                     )
 
@@ -530,8 +555,16 @@ def ad_cost_day_mp(account_id, access_token, st, et):
     # logging.info(li)
     # exit()
     if len(li) > 0:
-        logging.info(f"{account_id} have ad cost :{len(li)} ")
-        db.quchen_text.executeMany('replace into ad_cost_day values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)', li)
+        # TODO:询问一下adgroup_id,campaign_id作用
+        li_df = pandas.DataFrame(li)
+        li_df_g = li_df.groupby([0, 1, 9, 10])
+        li_new = []
+        for index, row in li_df_g.agg('sum').iterrows():
+            new_row = row.tolist()
+            new_row = list(index[0:2]) + new_row[1:] + list(index[2:])
+            li_new.append(tuple(new_row))
+        logging.info(f"{account_id} have ad cost :{len(li_new)} ")
+        db.quchen_text.executeMany('replace into ad_cost_day values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)', li_new)
         db.close()
 
 

+ 15 - 14
app/api_data/tx_ad_cost/get_cost_older.py

@@ -49,6 +49,7 @@ def daily_reports_get(access_token, account_id, st, et, level, fields, err_num=0
             parameters[k] = json.dumps(parameters[k])
 
     r = requests.get(url, params=parameters).json()
+    logging.info('account_id: {} 开始获取消耗数据'.format(account_id))
     if r['code'] != 0:
         logging.warning(
             'access_token:{} code:{} message:{}'.format(str(access_token), str(r['code']), str(r['message'])))
@@ -102,6 +103,20 @@ def get_qq_list():
     return a
 
 
+def mysql_insert_daily_vx(data):
+    b = """replace into daily_vx (date,cost,view_count,valid_click_count,ctr,official_account_follow_rate,order_amount,
+	order_roi,order_count,order_rate,order_unit_price,web_order_cost,first_day_order_amount,first_day_order_count,account_id)
+	 values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
+    db.quchen_text.executeMany(b, data)
+
+
+def mysql_insert_daily_qq(data):
+    a = """replace into daily_qq (date,view_count,valid_click_count,ctr,cpc,cost,web_order_count,web_order_rate,
+	web_order_cost,follow_count,order_amount,order_roi,platform_page_view_count,web_commodity_page_view_count,
+	from_follow_uv,account_id) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
+    db.quchen_text.executeMany(a, data)
+
+
 def get_daily_vx(st, et):
     token_list_v = get_vx_list()
     logging.info("获取vx账号:" + str(token_list_v.__len__()))
@@ -130,20 +145,6 @@ def get_daily_qq(st, et):
     mysql_insert_daily_qq(li)
 
 
-def mysql_insert_daily_vx(data):
-    b = """replace into daily_vx (date,cost,view_count,valid_click_count,ctr,official_account_follow_rate,order_amount,
-	order_roi,order_count,order_rate,order_unit_price,web_order_cost,first_day_order_amount,first_day_order_count,account_id)
-	 values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
-    db.quchen_text.executeMany(b, data)
-
-
-def mysql_insert_daily_qq(data):
-    a = """replace into daily_qq (date,view_count,valid_click_count,ctr,cpc,cost,web_order_count,web_order_rate,
-	web_order_cost,follow_count,order_amount,order_roi,platform_page_view_count,web_commodity_page_view_count,
-	from_follow_uv,account_id) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
-    db.quchen_text.executeMany(a, data)
-
-
 def run(st, et):
     logging.info('微信消耗数据拉取,开始')
     get_daily_vx(st, et)

+ 86 - 28
app/etl/MaterialLibrary/MaterialDataClean.py

@@ -4,57 +4,66 @@ desc : 素材库数据清洗
 
 """
 from model.DataBaseUtils import MysqlUtils
-db =MysqlUtils()
+
+db = MysqlUtils()
+
 
 def title():
     sql = """select REPLACE(REPLACE(title, CHAR(10), ''), CHAR(13), '') content,
-        sum(cost) consume_amount,
+        cast(sum(cost) as float) consume_amount,
         sum(click_count) click_times,
         sum(view_count) view_times,
+        cast(count(*) as decimal(10,2)) use_times,
         group_concat(distinct book) novels,
         max(dt) end_date,min(dt) start_date 
         from dw_image_cost_day where title!='' and title is not null  GROUP BY REPLACE(REPLACE(title, CHAR(10), ''), CHAR(13), '')
 				"""
 
-    df =  db.dm.getData_pd(sql)
-    # print(df)
+    df = db.dm.pd_data_sql(sql)
+
     df["data_type"] = 'all'
-    df['type'] = 1
-    df['create_by'] = 0
+    df['type'] = '1'
+    df['create_by'] = '0'
 
     key = ["content", "type"]
-    tag = ["view_times", "click_times", "novels", "start_date", "end_date", "create_by",'data_type', 'consume_amount']
+    tag = ["view_times", "click_times", "novels", "start_date", "end_date", "create_by", 'data_type',
+           'consume_amount']
     table = "t_ads_content"
-
     db.zx_ads.dfsave2mysql(df, table, key, tag)
 
+
 def description():
     sql = """select REPLACE(REPLACE(description, CHAR(10), ''), CHAR(13), '') content,
             sum(cost) consume_amount,
             sum(click_count) click_times,
             sum(view_count) view_times,
+            cast(count(*) as decimal(10,2)) use_times,
             group_concat(distinct book) novels,
             max(dt) end_date,min(dt) start_date
             from dw_image_cost_day where description!='' and description is not null  GROUP BY REPLACE(REPLACE(description, CHAR(10), ''), CHAR(13), '')
     				"""
 
-    df = db.dm.getData_pd(sql)
+    df = db.dm.pd_data_sql(sql)
+
     # print(df)
 
     df["data_type"] = 'all'
-    df['type'] = 2
-    df['create_by'] = 0
+    df['type'] = '2'
+    df['create_by'] = '0'
     key = ["content", "type"]
-    tag = ["view_times", "click_times", "novels", "start_date", "end_date", "create_by",'data_type', 'consume_amount']
+    tag = ['use_times', "view_times", "click_times", "novels", "start_date", "end_date", "create_by", 'data_type',
+           'consume_amount']
     table = "t_ads_content"
 
     db.zx_ads.dfsave2mysql(df, table, key, tag)
 
 
 def image():
-    sql="""select signature,sum(consume_amount) consume_amount,
+
+    sql = """select signature,sum(consume_amount) consume_amount,
             sum(click_times) click_times,
             sum(view_times) view_times,
+            sum(use_times) use_times,
             group_concat(distinct novels) novels ,
             max(end_date) end_date,
             min(start_date) start_date,
@@ -62,10 +71,16 @@ def image():
             min(type) type,  
             if(locate(',',signature)>0,0,1) single_img,
             min(width ) width ,
-            min(height ) height 
+            min(height ) height ,
+            min(media_size) media_size ,
+            min(media_format) media_format,
+            min(video_length) video_length,
+            min(video_bit_rate) video_bit_rate,
+            0 max_media_size 
         from (select replace(signature,' ,','') as signature ,
             sum(cost) consume_amount,
             sum(click_count) click_times,
+            sum(use_times) use_times,
             sum(view_count) view_times,
             group_concat(distinct book) novels ,
             max(dt) end_date,
@@ -74,7 +89,11 @@ def image():
             if(is_video=1,2,1) type,  
             if(locate(',',signature)>0,0,1) single_img,
             min(replace(if(left (width ,2)='0,',substring(width ,3),width) ,',0','')) width ,
-			min(replace(if(left (height ,2)='0,',substring(height ,3),height) ,',0','')) height 
+			min(replace(if(left (height ,2)='0,',substring(height ,3),height) ,',0','')) height,
+			min(replace(if(left (size ,2)='0,',substring(size ,3),size) ,',0','')) media_size ,
+            min(replace(format ,' ,','')) media_format,
+            min(video_length) video_length,
+            min(video_bit_rate) video_bit_rate
             from dw_image_cost_day  
             where signature is not null and signature !=''  
             and length (replace (replace (signature,',',''),' ',''))>0
@@ -82,26 +101,37 @@ def image():
             group by signature   
             """
 
-    df = db.dm.getData_pd(sql)
+    # df = db.dm.getData_pd(sql)
+    df = db.dm.pd_data_sql(sql)
     # print(df)
-
-    df['create_by'] = 0
+    # 进行数据转换-----添加max_media_size
+    for i in range(len(df['media_size'])):
+        if not df['media_size'][i]:
+            continue
+        size_list = df['media_size'][i].split(',')
+        max_size = 0
+        for size_data in size_list:
+            if size_data != 'None':
+                if float(size_data) > max_size:
+                    max_size = str(size_data)
+        df['max_media_size'][i] = max_size
+
+    df['create_by'] = '0'
     df["data_type"] = 'all'
 
     key = ["signature"]
-    tag = ["view_times", "click_times", "novels", "start_date", "end_date", "create_by", "single_img", "content",'consume_amount','type','width','height']
+    tag = ['media_size', 'media_format', 'video_length', 'video_bit_rate', 'use_times', "view_times", "click_times", "novels", "start_date", "end_date", "create_by", "single_img",
+           "content", 'consume_amount', 'type', 'width', 'height']
     table = "t_ads_media"
 
     db.zx_ads.dfsave2mysql(df, table, key, tag)
 
 
-
-
-
 def adcreative():
-    sql="""select  signature,title,article,
+    sql = """select  signature,title,article,
 sum(click_times) click_times,
 sum(view_times) view_times,
+sum(use_times) use_times,
 sum(consume_amount) consume_amount,
 group_concat(distinct novels) novels,
 min(start_date) start_date,
@@ -111,17 +141,27 @@ min(channel) channel ,
 min(type) type,
 if(locate(',',signature)>0,0,1) single_img,
 min(width) width ,
-min(height) height 
+min(height) height,
+min(media_size) media_size ,
+min(media_format) media_format,
+min(video_length) video_length,
+min(video_bit_rate) video_bit_rate,
+0 max_media_size 
 from 
  (select replace(signature ,' ,','') as signature,title,description article,
 sum(click_count) click_times,
 sum(view_count) view_times,
+sum(use_times) use_times,
 sum(cost) consume_amount,
 group_concat(distinct book) novels,
 min(dt) start_date,max(dt) end_date,
 min(replace(preview_url ,' ,','')) media,
 min(replace(if(left (width ,2)='0,',substring(width ,3),width) ,',0','')) width ,
 min(replace(if(left (height ,2)='0,',substring(height ,3),height) ,',0','')) height ,
+min(replace(if(left (size ,2)='0,',substring(size ,3),size) ,',0','')) media_size ,
+min(replace(format ,' ,','')) media_format,
+min(video_length) video_length,
+min(video_bit_rate) video_bit_rate,
 type channel,
 if(is_video=1,2,1) type,
 if(locate(',',signature)>0,0,1) single_img
@@ -129,10 +169,24 @@ from dw_image_cost_day where signature is not null and signature!=''
 GROUP BY  signature,title,description,type,is_video) as foo
 group by signature ,title,article  """
 
-    df = db.dm.getData_pd(sql)
-
-    key = ["signature",'title','article']
-    tag = ["view_times", "click_times", "novels", "start_date", "end_date","type","channel",'consume_amount','single_img','media','width','height']
+    # df = db.dm.getData_pd(sql)
+    df = db.dm.pd_data_sql(sql)
+    # 进行数据转换-----添加max_media_size
+    for i in range(len(df['media_size'])):
+        if not df['media_size'][i]:
+            continue
+        size_list = df['media_size'][i].split(',')
+        max_size = 0
+        for size_data in size_list:
+            if size_data != 'None':
+                if float(size_data) > max_size:
+                    max_size = str(size_data)
+        df['max_media_size'][i] = max_size
+
+    key = ["signature", 'title', 'article']
+    tag = ['media_size', 'media_format', 'video_length', 'video_bit_rate', 'max_media_size', 'use_times', "view_times",
+           "click_times", "novels", "start_date", "end_date", "type", "channel",
+           'consume_amount', 'single_img', 'media', 'width', 'height']
     table = "t_ads_idea"
 
     db.zx_ads.dfsave2mysql(df, table, key, tag)
@@ -147,3 +201,7 @@ def run():
 
 if __name__ == '__main__':
     run()
+    # title()
+    # description()
+    # image()
+    # adcreative()

+ 35 - 20
app/etl/dw/dw_image_cost_day.py

@@ -1,16 +1,17 @@
 import logging
-from model.DataBaseUtils import MysqlUtils,CkUtils
+from model.DataBaseUtils import MysqlUtils, CkUtils
 from model.DateUtils import DateUtils
 from model.DingTalkUtils import DingTalkUtils
 # logging.getLogger().setLevel(logging.WARNING)
 import pandas as pd
+
 db = MysqlUtils()
 ck = CkUtils()
-du=DateUtils()
+du = DateUtils()
 
 
 def run(dt):
-    sql=f"""SELECT a.dt,b.type,count(*) as ct,sum(a.cost),sum(view_count),sum(click_count),sum(follow_count),sum(order_count),sum(order_amount),
+    sql = f"""SELECT a.dt,b.type,count(*) as ct,sum(a.cost),sum(view_count),sum(click_count),sum(follow_count),sum(order_count),sum(order_amount),
             title,description,book,platform,stage,e.channel,pitcher,ifnull(image_id,'')
             from 
             ad_cost_day a 
@@ -34,48 +35,60 @@ def run(dt):
     for i in data:
         # print(i)
         li.extend(i[-1].split(','))
-    #TODO:之后如果一天产生的图片过多,可能超过sql的字符限制,
+    # TODO:之后如果一天产生的图片过多,可能超过sql的字符限制,
     # 之后数据使用hive,来进行数据存储
 
-    sql3 = f"select image_id,preview_url,signature,width,height from image_info where  image_id in ({str(set(li))[1:-1]})"
+    sql3 = f"select image_id,preview_url,signature,width,height,size,`type` from image_info where  image_id in ({str(set(li))[1:-1]})"
 
     image_di = {}
     image_data = db.quchen_text.getData(sql3)
     for x in image_data:
-        image_di[x[0]] = (x[1],x[2],x[3],x[4])
+        image_di[x[0]] = (x[1], x[2], x[3], x[4], x[5], x[6])
 
     # print(image_di)
 
     for i in data:
         preview_url = ''
         signature = ''
-        width = ''
-        height = ''
+        width = '0'
+        height = '0'
         image_id = ''
+        size = ''
+        type = ''
+        video_length = None
+        video_bit_rate = None
         for j in i[-1].split(','):
             if image_di.get(j):
                 image_id = image_id + ',' + j
                 preview_url = preview_url + ',' + image_di.get(j)[0]
                 signature = signature + ',' + image_di.get(j)[1]
-                width = width + ',' + str(image_di.get(j)[2])
-                height = height + ',' + str(image_di.get(j)[3])
+                width = str(image_di.get(j)[2])
+                height = str(image_di.get(j)[3])
+                size = size + ',' + str(image_di.get(j)[4])
+                type = type + ',' + str(image_di.get(j)[5])
             else:
-                image_id = image_id + ',' +j
+                image_id = image_id + ',' + j
                 preview_url = preview_url + ',' + ' '
                 signature = signature + ',' + ' '
-                width = width + ',' + '0'
-                height = height + ',' + '0'
-        i[-1]=image_id[1:]
+                width = '0'
+                height = '0'
+                size = size + ',' + '0'
+                type = type + ',' + ' '
+        i[-1] = image_id[1:]
         i.append(preview_url[1:])
         i.append(signature[1:])
         i.append(0)
         i.append(width[1:])
         i.append(height[1:])
+        i.append(size[1:])
+        i.append(type[1:])
+        i.append(video_length)
+        i.append(video_bit_rate)
 
     # exit(0)
     sql_video = f"""SELECT a.dt,b.type,count(*),sum(a.cost),sum(view_count),sum(click_count),sum(follow_count),sum(order_count),sum(order_amount),
             title,description,book,platform,stage,e.channel,pitcher,ifnull(image_id,''),g.preview_url,g.signature,1,
-            g.width,g.height
+            g.width,g.height,g.`size` ,g.`type` ,g.video_length ,g.byte_rate 
             from 
             ad_cost_day a 
             left join ad_info b on a.ad_id=b.ad_id
@@ -86,14 +99,14 @@ def run(dt):
             where a.dt='{dt}'  and c.is_video=1
             group by a.dt,b.type,title,description,book,platform,stage,image_id,e.channel,pitcher """
 
-
     data_video = db.quchen_text.get_data_list(sql_video)
     data.extend(data_video)
 
-    #进行数据存储
+    # 进行数据存储
     db.dm.execute(f'delete from dw_image_cost_day where dt="{dt}"')
-    db.dm.executeMany("replace into  dw_image_cost_day values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",data)
-
+    db.dm.executeMany(
+        "replace into dw_image_cost_day values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",
+        data)
 
 
 def hourly():
@@ -104,6 +117,7 @@ def hourly():
     except:
         DingTalkUtils().send("广告数据清洗失败")
 
+
 def day():
     logging.info('广告数据清洗,开始')
     for i in du.getDateLists(du.get_n_days(-10), du.get_n_days(-1)):
@@ -111,9 +125,10 @@ def day():
         run(i)
     logging.info('广告数据清洗,结束')
 
+
 if __name__ == '__main__':
     # run('2021-05-18')
 
-    for i in du.getDateLists('2021-05-01','2021-05-20'):
+    for i in du.getDateLists(du.get_n_days(-3), du.get_n_days(0)):
         print(i)
         run(i)

+ 82 - 8
example/update_cost_data.py

@@ -6,21 +6,27 @@ from app.etl.dw.dw_channel_daily import dw_channel_daily
 from app.etl.dw.dw_pitcher_daily import dw_pitcher_trend
 from app.etl.src.src_book_info import src_book_info
 from app.etl.dw.dw_book_trend import book_trend
+from model.DateUtils import DateUtils
+from model.DataBaseUtils import MysqlUtils
+from app.api_data.tx_ad_cost.get_cost_older import get_v_data, mysql_insert_daily_vx, get_q_data, mysql_insert_daily_qq
+import logging
+import time
+from logging import handlers
 
+db = MysqlUtils()
 du = DateUtils()
 
 
-def get_data():
-    #1.获取数据
-    st = du.get_n_days(-2)
-    et = du.get_n_days(0)
+def get_data(st, et):
+    # 1.获取数据
+
     print(st, et)
     get_cost_older.run(st, et)
 
-    #同步到ck
+    # 同步到ck
     do_cost(st, et)
 
-    #2.数据处理
+    # 2.数据处理
     src_book_info()  # 书籍卡点信息
     # book_annual_expect_profit.run() # 年预期收益
     dw_channel_daily()
@@ -29,6 +35,74 @@ def get_data():
     dm_pitcher_daily_overview()
 
 
+def get_data_vx(channel, st, et):
+    # 用于处理单个微信号相关信息
+    sql = '''select account_id,wechat_account_id,access_token,refresh_token,name,
+                ifnull(stage,''),ifnull(pitcher,''),ifnull(platform,''),ifnull(book,'') from advertiser_vx
+                where account_id in (select account_id from channel_by_account_daily
+                where channel ='{}' 
+                order by dt desc  )
+                '''.format(channel)
+
+    token_list_v = db.quchen_text.getData(sql)
+    print(token_list_v)
+    time1 = time.time()
+    li = []
+    for y in token_list_v:
+        get_v_data(y, li, st, et)
+    for _ in li:
+        print(_)
+    print('get_daily_vx:' + str(len(li)) + 'cost:' + str(int(time.time() - time1)))
+    mysql_insert_daily_vx(li)
+    # do_cost(st, et)
+    # src_book_info()  # 书籍卡点信息
+    # dw_channel_daily()
+    # dw_pitcher_trend()
+    # book_trend()
+    # dm_pitcher_daily_overview()
+
+
+def get_data_gdt(channel, st, et):
+    # 用于处理单个微信号相关信息
+    sql = '''select account_id,'',access_token,refresh_token,name,
+                ifnull(stage,''),ifnull(pitcher,''),ifnull(platform,''),ifnull(book,'') from advertiser_qq
+                where account_id in (select account_id from channel_by_account_daily
+                where channel ='{}' 
+                order by dt desc  )
+                '''.format(channel)
+    print(sql)
+    token_list_v = db.quchen_text.getData(sql)
+    print(token_list_v)
+    time1 = time.time()
+    li = []
+    for y in token_list_v:
+        get_q_data(y, li, st, et)
+    for _ in li:
+        print(_)
+    print('get_daily_qq:' + str(len(li)) + 'cost:' + str(int(time.time() - time1)))
+    mysql_insert_daily_qq(li)
+    do_cost(st, et)
+    src_book_info()  # 书籍卡点信息
+    dw_channel_daily()
+    dw_pitcher_trend()
+    book_trend()
+    dm_pitcher_daily_overview()
+
+
+if __name__ == "__main__":
+    logging.basicConfig(
+        handlers=[
+            logging.handlers.RotatingFileHandler('./cost_data.log',
+                                                 maxBytes=10 * 1024 * 1024,
+                                                 backupCount=5,
+                                                 encoding='utf-8')
+            , logging.StreamHandler()  # 供输出使用
+        ],
+        level=logging.INFO,
+        format="%(asctime)s - %(levelname)s %(filename)s %(funcName)s %(lineno)s - %(message)s"
+    )
+    st = du.get_n_days(-20)
+    et = du.get_n_days(0)
+    get_data_vx(channel='安宜文海', st=st, et=et)
 
-if __name__=="__main__":
-    get_data()
+    # get_data(st,et)

+ 54 - 32
example/update_order_data.py

@@ -14,6 +14,8 @@ from app.etl.sync_to_ck_task import order_sync_ck
 from app.api_data.platform_order import yangguang
 from app.api_data.tx_ad_cost import get_cost_older
 from app.etl.data_stat_run import do_cost
+from logging import handlers
+
 
 du = DateUtils()
 
@@ -28,52 +30,72 @@ def order_date_get(st,et):
     t7 = threading.Thread(target=yueweng, args=(st, et))
     t8 = threading.Thread(target=yangguang, args=(st, et))
     t9 = threading.Thread(target=youshuge, args=(st, et))
-    t1.start()
-    t1.join()
-    t2.start()
-    t2.join()
-    t3.start()
-    t3.join()
-    t4.start()
-    t4.join()
-    t5.start()
-    t5.join()
-    t6.start()
-    t6.join()
-    t7.start()
-    t7.join()
-    t8.start()
-    t8.join()
-    t9.start()
-    t9.join()
-
+    t10 = threading.Thread(target=guofeng, args=(st, et))
+    # t1.start()
+    # t1.join()
+    # t2.start()
+    # t2.join()
+    # t3.start()
+    # t3.join()
+    # t4.start()
+    # t4.join()
+    # t5.start()
+    # t5.join()
+    # t6.start()
+    # t6.join()
+    # t7.start()
+    # t7.join()
+    # t8.start()
+    # t8.join()
+    # t9.start()
+    # t9.join()
+    t10.start()
+    t10.join()
     yangguang.get_channel_info()
 
 def update_order():
     # 1.获取数据
 
-    st = du.get_n_days(-2)
+    st = du.get_n_days(-30)
     et = du.get_n_days(0)
-    print(st, et)
+    # print(st, et)
     order_date_get(st,et)
+    # do_order(st, et)
+    # src_book_info()  # 书籍卡点信息
+    # # book_annual_expect_profit.run() # 年预期收益
+    # dw_channel_daily()
+    # dw_pitcher_trend()
+    # book_trend()
+    # dm_pitcher_daily_overview()
 
-    st = du.get_n_days(-1)
-    et = du.get_n_days(0)
-    do_order(st, et)
-    src_book_info()  # 书籍卡点信息
-    # book_annual_expect_profit.run() # 年预期收益
-    dw_channel_daily()
-    dw_pitcher_trend()
-    book_trend()
-    dm_pitcher_daily_overview()
+def update_order_zhangzhognyun():
+    zhangzhongyun(start=du.get_n_days(-15),end=du.get_n_days(0))
 
+def update_order_guofeng():
+    guofeng(start=du.get_n_days(-15),end=du.get_n_days(0))
 
 def update_order_do():
-    st = du.get_n_days(-10)
+    st = du.get_n_days(-30)
     et = du.get_n_days(0)
     print(st, et)
     do_order(st, et)
 
 if __name__=='__main__':
+    logging.basicConfig(
+        handlers=[
+            logging.handlers.RotatingFileHandler('./update_order_data.log',
+                                                 maxBytes=10 * 1024 * 1024,
+                                                 backupCount=5,
+                                                 encoding='utf-8')
+            , logging.StreamHandler()  # 供输出使用
+        ],
+        level=logging.INFO,
+        format="%(asctime)s - %(levelname)s %(filename)s %(funcName)s %(lineno)s - %(message)s"
+    )
     # yangguang.get_channel_info()
-    update_order_do()
+    # update_order()
+    # update_order()
+    # update_order_zhangzhognyun()
+    update_order_guofeng()
+
+

+ 81 - 0
example/update_video.py

@@ -0,0 +1,81 @@
+import requests
+import cv2
+import sys
+import pandas
+from model.sql_models import DB
+from config.using_config import quchen_text, dm
+
+
+def update_video_info():
+    # 1.获取到所有的video_url
+    sql = '''
+            select * from video_info vi 
+                where length (preview_url )>0;
+            '''
+    df = pandas.read_sql(sql=sql, con=db_qc.engine)
+    print(df)
+    for index, row in df.iterrows():
+
+        # 2.获取video信息----大小,时长,格式
+        # video_url = 'http://wxsnsdy.wxs.qq.com/131/20210/snssvpdownload/SH/reserved/ads_svp_video__0b53qybncaaciaadpfl53jqbrbqe2gdafuka.f0.mp4?dis_k=4a9de877e9ee8dffe7a12a55700c7c0e&dis_t=1618994896&m=f4ed07a998cc60ba25ec1c2425176ea8'
+        video_url = row['preview_url']
+        rsp = requests.get(video_url)
+        with open('aa.mp4', 'wb') as f:
+            f.write(rsp.content)
+        video_size = len(rsp.content)
+        cap = cv2.VideoCapture('aa.mp4')  # 视频流
+        if cap.isOpened():
+            rate = cap.get(5)
+            frame_num = cap.get(7)
+            print(frame_num, rate)
+            duration = frame_num / rate
+        print(video_size, duration)
+        # byte_rate = video_size / 1024 * 8 / duration
+        byte_rate = (video_size/(duration/8))
+        print(byte_rate)
+        # TODO:码率有问题之后重新计算
+
+        # 3.进行存储
+        sql = '''
+        update video_info
+        set size={},video_length={},type='mp4',byte_rate={}
+        where video_id='{}'
+        '''.format(video_size, duration, byte_rate, row['video_id'])
+        db_qc.session.execute(sql)
+        db_qc.session.commit()
+
+def update_byte_rate():
+    sql = '''
+                select * from video_info vi 
+                    where length (preview_url )>0;
+                '''
+    df = pandas.read_sql(sql=sql, con=db_qc.engine)
+    print(df)
+    for index, row in df.iterrows():
+
+        # 2.获取video信息----大小,时长,格式
+        # video_url = 'http://wxsnsdy.wxs.qq.com/131/20210/snssvpdownload/SH/reserved/ads_svp_video__0b53qybncaaciaadpfl53jqbrbqe2gdafuka.f0.mp4?dis_k=4a9de877e9ee8dffe7a12a55700c7c0e&dis_t=1618994896&m=f4ed07a998cc60ba25ec1c2425176ea8'
+        video_url = row['preview_url']
+        video_size = row['size']
+        duration = row['video_length']
+
+        # byte_rate = video_size / 1024 * 8 / duration
+        byte_rate = (video_size / (duration / 8))
+        print(byte_rate)
+        # TODO:码率有问题之后重新计算
+
+        # 3.进行存储
+        sql = '''
+            update video_info
+            set size={},video_length={},type='mp4',byte_rate={}
+            where video_id='{}'
+            '''.format(video_size, duration, byte_rate, row['video_id'])
+        db_qc.session.execute(sql)
+        db_qc.session.commit()
+
+
+if __name__ == '__main__':
+    db_qc = DB(config=quchen_text)
+    db_dm = DB(config=dm)
+    # update_video_info()
+    update_byte_rate()

+ 41 - 43
model/DataBaseOperation.py

@@ -8,6 +8,7 @@ import pymysql
 import logging as log
 import pandas as pd
 import time
+
 pd.set_option('display.max_columns', None)
 pd.set_option('display.width', 1000)
 MYSQL_DEBUG = 1
@@ -16,6 +17,7 @@ MYSQL_DEBUG = 1
 class MysqlOperation:
 
     def __init__(self, host, user, passwd, db, port=3306):
+        #TODO:进行修改,不可以使用try catch
         try:
             self.conn = pymysql.connect(host=host,
                                         user=user,
@@ -27,13 +29,12 @@ class MysqlOperation:
         except Exception as e:
             log.info(e)
 
-
     def set_dict_cursor(self):
         """
         设置字典形式取数据
         """
         self.cursor = self.conn.cursor(pymysql.cursors.DictCursor)
-    
+
     def getData(self, sql, args=None):
         """
 
@@ -51,19 +52,19 @@ class MysqlOperation:
             log.info('sql cost: %s' % (time.time() - start))
         return result
 
-    def get_data_list(self,sql,arg=None):
+    def get_data_list(self, sql, arg=None):
         """
         :param sql:
         :param arg:
         :return: list[list]
         """
-        data=self.getData(sql,arg)
-        li=[]
+        data = self.getData(sql, arg)
+        li = []
         for i in data:
             li.append(list(i))
         return li
 
-    def getDataOneList(self,sql):
+    def getDataOneList(self, sql):
         """获取一列"""
         data = self.getData(sql)
         li = []
@@ -71,13 +72,12 @@ class MysqlOperation:
             li.append(i[0])
         return li
 
-
-    def execute(self, sql,data=None):
+    def execute(self, sql, data=None):
         start = time.time()
         if data:
-            k=self.cursor.execute(sql,data)
+            k = self.cursor.execute(sql, data)
         else:
-            k=self.cursor.execute(sql)
+            k = self.cursor.execute(sql)
         self.conn.commit()
         # if MYSQL_DEBUG:
         #
@@ -85,18 +85,16 @@ class MysqlOperation:
         #     log.info('sql cost: %s' % (time.time() - start))
         log.info(f"affect rows :{k}")
 
-
-
-    def executeMany(self,sql,data):
+    def executeMany(self, sql, data):
         start = time.time()
-        k=self.cursor.executemany(sql,data)
+        k = self.cursor.executemany(sql, data)
         self.conn.commit()
         # if MYSQL_DEBUG:
         #     log.info('sql: \n' + sql)
         #     log.info('sql cost: %s' % (time.time() - start))
         log.info(f"\033[1;36maffect rows :{k} \033[0m")
 
-    def getOne(self,sql, args=None):
+    def getOne(self, sql, args=None):
         result = self.getData(sql, args)
 
         return result[0][0]
@@ -120,6 +118,14 @@ class MysqlOperation:
             log.info('sql cost: %s' % (time.time() - start))
         return df
 
+    def pd_data_sql(self, sql):
+
+        df = pd.read_sql(sql=sql, con=self.conn)
+        for index, df_type in df.dtypes.items():
+            if df_type != 'object':
+                df[index] = df[index].astype('object')
+        return df
+
     # def insertData(self, sql, args=None):
     #     # if args:
     #     #     log.debug(sql % tuple(args))
@@ -297,7 +303,7 @@ class MysqlOperation:
         else:
             last_data = -1
             repeat_key = set()
-            for i in sorted(tmp,key=lambda x:str(x)):
+            for i in sorted(tmp, key=lambda x: str(x)):
                 if last_data == i:
                     repeat_key.add(i)
                 if len(repeat_key) >= 10:
@@ -311,8 +317,8 @@ class MysqlOperation:
         if isinstance(data, pd.DataFrame):
             # np.nan != np.nan 从而判断值为np.nan
             list_data = [map(lambda x: None if x != x else x, list(data.iloc[_, :])) for _ in range(len(data))]
-            li =[]
-            for  i in list_data:
+            li = []
+            for i in list_data:
                 li.append(list(i))
             list_data = li
 
@@ -377,7 +383,8 @@ class MysqlOperation:
             length = len(tag_values)
             for i in range(0, length, split):
                 start, finish = i, i + split
-                self.insertorupdatemany_v2(table, keys, tags, tag_values[start:finish], key_values[start:finish], flag, split=split)
+                self.insertorupdatemany_v2(table, keys, tags, tag_values[start:finish], key_values[start:finish], flag,
+                                           split=split)
             return
         if len(key_values) == 0 or len(tag_values) == 0:
             log.debug('insert or update 0 rows')
@@ -409,7 +416,6 @@ class MysqlOperation:
                          flag=flag,
                          split=split)
 
-
     def updateManyV2(self, table, keys, tags, tag_values, key_values, flag=False, split=80):
         if not isinstance(tag_values, (tuple, list, pd.DataFrame)):
             log.error('Type Error')
@@ -419,7 +425,8 @@ class MysqlOperation:
             length = len(tag_values)
             for i in range(0, length, split):
                 start, finish = i, i + split
-                self.updateManyV2(table, keys, tags, tag_values[start:finish], key_values[start:finish], flag, split=split)
+                self.updateManyV2(table, keys, tags, tag_values[start:finish], key_values[start:finish], flag,
+                                  split=split)
             return
         if len(key_values) == 0 or len(tag_values) == 0:
             log.info('update 0 rows')
@@ -445,7 +452,7 @@ class MysqlOperation:
             keys=key,
             tags=tag,
             tag_values=df[tag],
-            key_values=df[key],split=1000
+            key_values=df[key], split=1000
         )
 
     def insertorupdatemany_v3(self, df, table, keys, tags, flag=False, split=80):
@@ -468,7 +475,6 @@ class MysqlOperation:
             key_values=df[key]
         )
 
-
     def _get_s_format(self, data):
         """
         Args:
@@ -546,7 +552,8 @@ class MysqlOperation:
             length = len(tag_values)
             for i in range(0, length, split):
                 start, finish = i, i + split
-                self.insert_many(table, keys, tags, tag_values[start:finish], key_values[start:finish], flag, split=split)
+                self.insert_many(table, keys, tags, tag_values[start:finish], key_values[start:finish], flag,
+                                 split=split)
             return
         tag_values = self._convert_to_list(tag_values)
         key_values = self._convert_to_list(key_values)
@@ -565,7 +572,7 @@ class MysqlOperation:
             log.info(sql_insert % tuple(value_insert))
         t0 = time.time()
 
-        self.cursor.execute(sql_insert,tuple(value_insert))
+        self.cursor.execute(sql_insert, tuple(value_insert))
         log.info('insert %s rows, cost: %s' % (len(key_values), round(time.time() - t0, 2)))
         self.conn.commit()
 
@@ -607,7 +614,8 @@ class MysqlOperation:
             length = len(tag_values)
             for i in range(0, length, split):
                 start, finish = i, i + split
-                self.update_many(table, keys, tags, tag_values[start:finish], key_values[start:finish], flag, split=split)
+                self.update_many(table, keys, tags, tag_values[start:finish], key_values[start:finish], flag,
+                                 split=split)
             return
         if len(key_values) == 0 or len(tag_values) == 0:
             log.info('update 0 rows')
@@ -660,29 +668,19 @@ class MysqlOperation:
         self.conn.commit()
         log.info('update %s rows, cost: %s' % (len(key_values), round(time.time() - t0, 2)))
 
-
-    def getColumn(self,table,flag=0):
+    def getColumn(self, table, flag=0):
         "获取表的所有列"
-        sql="SELECT `COLUMN_NAME` FROM `INFORMATION_SCHEMA`.`COLUMNS` " \
-            "WHERE `TABLE_NAME`='{}' ORDER BY ordinal_position".format(table)
+        sql = "SELECT `COLUMN_NAME` FROM `INFORMATION_SCHEMA`.`COLUMNS` " \
+              "WHERE `TABLE_NAME`='{}' ORDER BY ordinal_position".format(table)
         self.cursor.execute(sql)
-        a= self.cursor.fetchall()
-        str=''
-        li=[]
+        a = self.cursor.fetchall()
+        str = ''
+        li = []
         for i in a:
-            str+=i[0]+','
+            str += i[0] + ','
             li.append(i[0])
 
         if flag:
             return li
         else:
             return str[:-1]
-
-
-
-
-
-
-
-
-

+ 38 - 45
model/DataBaseUtils.py

@@ -10,6 +10,7 @@ import pandas as pd
 from clickhouse_driver.client import Client
 import logging
 
+
 def db_config():
     p_path = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir))
     path = os.path.join(p_path, "config", "db_config.yaml")
@@ -17,12 +18,13 @@ def db_config():
     config = yaml.load(f.read(), Loader=yaml.FullLoader)
     return config
 
+
 class MysqlUtils:
     _quchen_text = None
-    _zx=None
-    _zx_ads=None
-    _dm=None
-    _zx_test =None
+    _zx = None
+    _zx_ads = None
+    _dm = None
+    _zx_test = None
 
     def __init__(self):
         self.config = db_config()
@@ -32,9 +34,9 @@ class MysqlUtils:
 
         conf = self.config['quchen_text']
         self._quchen_text = MysqlOperation(host=conf['host'],
-                                      user=conf['user'],
-                                      passwd=conf['passwd'],
-                                      db=conf['db'])
+                                           user=conf['user'],
+                                           passwd=conf['passwd'],
+                                           db=conf['db'])
         return self._quchen_text
 
     @property
@@ -50,21 +52,20 @@ class MysqlUtils:
     def zx_ads(self):
         conf = self.config['zx_ads']
         self._zx_ads = MysqlOperation(host=conf['host'],
-                                  user=conf['user'],
-                                  passwd=conf['passwd'],
-                                  db=conf['db'])
+                                      user=conf['user'],
+                                      passwd=conf['passwd'],
+                                      db=conf['db'])
         return self._zx_ads
 
     @property
     def zx_test(self):
         conf = self.config['zx_test']
         self._zx_test = MysqlOperation(host=conf['host'],
-                                  user=conf['user'],
-                                  passwd=conf['passwd'],
-                                  db=conf['db'])
+                                       user=conf['user'],
+                                       passwd=conf['passwd'],
+                                       db=conf['db'])
         return self._zx_test
 
-
     @property
     def dm(self):
         conf = self.config['dm']
@@ -74,7 +75,6 @@ class MysqlUtils:
                                   db=conf['db'])
         return self._dm
 
-
     def find_db(self, db):
 
         if db == "quchen_text":
@@ -105,91 +105,84 @@ class MysqlUtils:
             self._zx_ads.cursor.close()
             self._zx_ads.conn.close()
 
+
 class CkUtils:
 
     def __init__(self):
-        self.config=db_config()
+        self.config = db_config()
         conf = self.config['clickhouse']
         self.client = Client(host=conf['host'],
-                           user=conf['user'],
-                           password=conf['passwd'],
-                           port=conf['port'],
-                           send_receive_timeout=5)
+                             user=conf['user'],
+                             password=conf['passwd'],
+                             port=conf['port'],
+                             send_receive_timeout=5)
 
     def execute(self, sql):
 
         return self.client.execute(sql)
 
-    def getData_pd(self,sql,col):
+    def getData_pd(self, sql, col):
         """
 
         :param sql:
         :param col: ['a','b']
         :return:
         """
-        data=self.execute(sql)
-        df=pd.DataFrame(data,columns=col)
+        data = self.execute(sql)
+        df = pd.DataFrame(data, columns=col)
         return df
 
-    def getData_pdv2(self,sql):
-        data = self.client.execute_iter(sql,with_column_types=True)
+    def getData_pdv2(self, sql):
+        data = self.client.execute_iter(sql, with_column_types=True)
         columns = [column[0] for column in next(data)]
         df = pd.DataFrame.from_records(data, columns=columns)
         return df
 
-
-    def getData_json(self,sql):
+    def getData_json(self, sql):
         return self.getData_pdv2(sql).to_json(orient='records')
 
-    def getColumns(self,table,is_list=False):
-        data=self.execute("desc "+table)
-        li=[]
+    def getColumns(self, table, is_list=False):
+        data = self.execute("desc " + table)
+        li = []
         str = ''
         for i in data:
             li.append(i[0])
-            str+=i[0]+','
+            str += i[0] + ','
         if is_list:
             return li
         else:
             return str[:-1]
 
-
-
-
-
-
-
-    def insertMany(self,table,col,data):
+    def insertMany(self, table, col, data):
         """
         :param table: 表名 srt
         :param col:   字段名 srt   eg: ”a,b,c“
         :param data:  tuple/list
         :return:
         """
-        max=1000
-        sql="insert into {} ({}) values ".format(table,col)
+        max = 1000
+        sql = "insert into {} ({}) values ".format(table, col)
 
         if len(data) == 0:
             logging.info("data.len==0")
             return
         if len(data) <= max:
-            sql = sql+str(data)[1:-1]
+            sql = sql + str(data)[1:-1]
             # log.info(sql)
             # log.info("insert {} rows".format(len(data)))
             self.execute(sql)
             return
         else:
 
-            sql2=sql+str(data[:max])[1:-1]
+            sql2 = sql + str(data[:max])[1:-1]
             # log.info(sql2)
             self.execute(sql2)
             # log.info("insert {} rows".format(max))
-            self.insertMany(table,col,data[max:])
+            self.insertMany(table, col, data[max:])
 
 
 if __name__ == '__main__':
     # p_path = os.path.dirname(os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)))
     # print(os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)))
-    ck=CkUtils()
+    ck = CkUtils()
     print(ck.execute("desc order"))
-