소스 검색

MOD:媒体信息添加

cxyu 3 년 전
부모
커밋
fa0bc28d7e
6개의 변경된 파일256개의 추가작업 그리고 30개의 파일을 삭제
  1. 3 1
      .gitignore
  2. 18 26
      app/api_data/tx_ad_cost/cost_util.py
  3. 114 0
      app/crontab_task/minuteRun.py
  4. 7 3
      app/etl/dw/dw_image_cost_day.py
  5. 79 0
      data_processing/video_processing.py
  6. 35 0
      example/update_video.py

+ 3 - 1
.gitignore

@@ -1,4 +1,6 @@
 .idea/
 target/
 *.log
-*.pyc
+*.pyc
+using_config.py
+using_config_test.py

+ 18 - 26
app/api_data/tx_ad_cost/cost_util.py

@@ -10,10 +10,7 @@ from model.ComUtils import *
 from model.DateUtils import DateUtils
 from PIL import Image
 from io import BytesIO
-import cv2
-import oss2
-import os
-import ffmpeg
+from data_processing import video_processing
 
 du = DateUtils()
 db = MysqlUtils()
@@ -294,26 +291,19 @@ def images_info_get(account_id, access_token, image_ids):  # 获取图片信息
 def video_info_get(account_id, access_token, image_ids):  # 获取视频信息
     # 接口 https://developers.e.qq.com/docs/api/business_assets/video/videos_get?version=1.3
 
-
-    def get_video_info(video_url, err_num=0):
+    def get_video_info(video_url, signature, err_num=0):
         try:
-            if video_url:
-                return None, None
-            rsp = requests.get(video_url)
-            with open('/tmp/aa.mp4', 'wb') as f:
-                f.write(rsp.content)
-            video_size = len(rsp.content)
-            cap = cv2.VideoCapture('/tmp/aa.mp4')  # 视频流
-            if cap.isOpened():
-                rate = cap.get(5)
-                frame_num = cap.get(7)
-                duration = frame_num / rate
-            byte_rate = (video_size / (duration / 8))
-            return duration, byte_rate
-        except:
+            if not video_url:
+                return None, None, None, None
+            cloud_filepath, metadata_title, video_size, duration, bit_rate, width, height, format = video_processing.change_format(
+                video_url, signature)
+            return duration, bit_rate, metadata_title, cloud_filepath
+        except Exception as e:
+            logging.error(str(e))
             if err_num < 5:
-                return get_video_info(video_url, err_num=err_num + 1)
-            return None, None
+                return get_video_info(video_url, signature, err_num=err_num + 1)
+            else:
+                return None, None, None, None
 
     # 1.数据库获取,查看是否需要获取对应数据
     id_content = ','.join([''' '{}' '''.format(i) for i in image_ids.split(',')])
@@ -380,16 +370,18 @@ def video_info_get(account_id, access_token, image_ids):  # 获取视频信息
                 break
     data = []
     for i in li:
-        duration, byte_rate = get_video_info(i['preview_url'])
+        # TODO:signature相同的,不进行再一次运行计算
+        duration, byte_rate, metadata_title, cloud_filepath = get_video_info(i['preview_url'], i['signature'])
         data.append((i['video_id'], i['width'], i['height'],
                      i['signature'], i['preview_url'], i['file_size'],
-                     'mp4', byte_rate, duration))
+                     'mp4', byte_rate, duration, metadata_title, cloud_filepath))
     logging.info(f"{account_id} 获取到新视频:" + str(li.__len__()))
     if li.__len__() > 0:
         sql = '''insert IGNORE into video_info (video_id,width,height,
-        signature,preview_url,size,type,byte_rate,video_length)
+        signature,preview_url,size,type,byte_rate,video_length,
+        video_meta_data,download_path)
          value (%s,%s,%s,
-         %s,%s,%s,%s,%s,%s)'''
+         %s,%s,%s,%s,%s,%s,%s,%s)'''
         db.quchen_text.executeMany(sql, data)
         db.close()
 

+ 114 - 0
app/crontab_task/minuteRun.py

@@ -0,0 +1,114 @@
+import logging
+from logging import handlers
+import os
+from model.sql_models import DB
+from data_processing import video_processing
+# TODO:上线时需要进行修改
+from config import using_config_test as db_config
+
+
+def image_check():
+    # TODO:图片素材,进行对应修改
+    pass
+
+
+def video_check():
+    # 1.检查media表中是否有byte为空,但是download_path有数据的
+    sql = '''select download_path from t_ads_media tam 
+        where download_path is not null
+        and video_bit_rate is null
+       and type =2 ;
+    '''
+    cursor = zx_db.session.execute(sql)
+    download_path_list = []
+    for line in cursor.fetchall():
+        download_path_list.append(line[0])
+    zx_db.session.commit()
+    # 2.检查signature(md5),是否已经存在于media,如果已经存在,一切用原来的信息
+    for download_path in download_path_list:
+        # TODO:有空的时候,添加去除本来上传的视频
+        download_url = 'https://zx-media-database.oss-cn-hangzhou.aliyuncs.com/' + download_path
+        logging.info('开始解析视频:{} '.format(download_url))
+
+        # 3.进行视频解析
+        cloud_filepath, metadata_title, video_size, duration, bit_rate, width, height, format = video_processing.change_format(
+            download_url)
+        # -1.更新media,idea两种表的数据
+
+        type = 2
+        signature = metadata_title[3:]
+        media_size = video_size
+        media_format = format
+        video_length = duration
+        video_bit_rate = bit_rate
+        max_media_size = media_size
+        height = height
+        width = width
+        aspect_ratio = round(width / (height if height else 1), 1)
+        media_sql = '''
+            update t_ads_media 
+                        set video_mate_data='{video_mate_data}',
+                        type='{type}' ,signature='{signature}', media_size='{media_size}',
+                        media_format = '{media_format}', video_length='{video_length}',
+                        video_bit_rate = '{video_bit_rate}', max_media_size='{max_media_size}',
+                        height='{height}', width='{width}', aspect_ratio = '{aspect_ratio}'
+                        where download_path='{download_path}';
+        '''.format(download_path=download_path, video_mate_data=metadata_title, type=type,
+                   signature=signature, media_size=media_size,
+                   media_format=media_format, video_length=video_length,
+                   video_bit_rate=video_bit_rate, max_media_size=max_media_size,
+                   height=height, width=width, aspect_ratio=aspect_ratio)
+
+        idea_sql = '''
+            update t_ads_idea 
+                set signature='{signature}', video_mate_data='{video_mate_data}',
+                         media_size='{media_size}',type='{type}' ,
+                        media_format = '{media_format}',
+                         video_length='{video_length}',
+                        video_bit_rate = '{video_bit_rate}', max_media_size='{max_media_size}',
+                        height='{height}', width='{width}', aspect_ratio = '{aspect_ratio}'
+                        where download_path='{download_path}'
+        '''.format(download_path=download_path, video_mate_data=metadata_title, type=type,
+                   signature=signature, media_size=media_size,
+                   media_format=media_format, video_length=video_length,
+                   video_bit_rate=video_bit_rate, max_media_size=max_media_size,
+                   height=height, width=width, aspect_ratio=aspect_ratio)
+        zx_db.session.execute(media_sql)
+        zx_db.session.execute(idea_sql)
+        zx_db.session.commit()
+
+
+def run():
+    # 1.检查上一个进程是否已经完结
+    result = os.popen('ps aux | grep python | grep minuteRun.py | grep -v grep')
+    res = result.read()
+    for line in res.splitlines():
+        print(line)
+    print(len(res.splitlines()))
+    if len(res.splitlines()) > 1:
+        logging.info('前面有视频检查程序在运行')
+        return
+    else:
+        video_check()
+
+
+if __name__ == '__main__':
+    # 定时检查素材库里的视频数据,是否是有
+    # 广告创意----也需要检查----图片可能是被动上传
+    #
+
+    logging.basicConfig(
+        handlers=[
+            logging.handlers.RotatingFileHandler('./log/minute_Run.log',
+                                                 maxBytes=10 * 1024 * 1024,
+                                                 backupCount=5,
+                                                 encoding='utf-8')
+            , logging.StreamHandler()  # 供输出使用
+        ],
+        level=logging.INFO,
+        format="%(asctime)s - %(levelname)s %(filename)s %(funcName)s %(lineno)s - %(message)s"
+    )
+    zx_db = DB(db_config.zx_ads)
+    logging.info('视频检查,开始')
+    run()
+    logging.info('视频检查,结束')

+ 7 - 3
app/etl/dw/dw_image_cost_day.py

@@ -57,6 +57,8 @@ def run(dt):
         type = ''
         video_length = None
         video_bit_rate = None
+        video_meta_data = None
+        download_path = None
         for j in i[-1].split(','):
             if image_di.get(j):
                 image_id = image_id + ',' + j
@@ -84,11 +86,13 @@ def run(dt):
         i.append(type[1:])
         i.append(video_length)
         i.append(video_bit_rate)
+        i.append(video_meta_data)
+        i.append(download_path)
 
     # exit(0)
     sql_video = f"""SELECT a.dt,b.type,count(*),sum(a.cost),sum(view_count),sum(click_count),sum(follow_count),sum(order_count),sum(order_amount),
             title,description,book,platform,stage,e.channel,pitcher,ifnull(image_id,''),g.preview_url,g.signature,1,
-            g.width,g.height,g.`size` ,g.`type` ,g.video_length ,g.byte_rate 
+            g.width,g.height,g.`size` ,g.`type` ,g.video_length ,g.byte_rate ,g.video_meta_data,g.download_path
             from 
             ad_cost_day a 
             left join ad_info b on a.ad_id=b.ad_id
@@ -105,7 +109,7 @@ def run(dt):
     # 进行数据存储
     db.dm.execute(f'delete from dw_image_cost_day where dt="{dt}"')
     db.dm.executeMany(
-        "replace into dw_image_cost_day values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",
+        "replace into dw_image_cost_day values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",
         data)
 
 
@@ -129,7 +133,7 @@ def day():
 if __name__ == '__main__':
     # run('2021-05-18')
 
-    for i in du.getDateLists(du.get_n_days(-10), du.get_n_days(0)):
+    for i in du.getDateLists(du.get_n_days(0), du.get_n_days(0)):
         print(i)
         run(i)
 

+ 79 - 0
data_processing/video_processing.py

@@ -0,0 +1,79 @@
+import logging
+import time
+import requests
+import hashlib
+import ffmpeg
+import os
+import oss2
+
+
+def change_format(video_url, file_md5=None):
+    now_time = int(time.time() * 1000)
+    input_file = '/tmp/{}.mp4'.format(now_time)
+    output_file = '/tmp/{}.mp4'.format(now_time + 1)
+    rsp = requests.get(video_url)
+    with open(input_file, 'wb') as f:
+        f.write(rsp.content)
+    if not file_md5:
+        file_md5 = hashlib.md5(rsp.content).hexdigest()
+        logging.info(file_md5)  # ac3ee699961c58ef80a78c2434efe0d0
+
+    metadata_title = 'zx_' + file_md5
+    ffmpeg.input(input_file).output(output_file, preset='slower', vcodec='h264',
+                                    metadata='title=' + metadata_title).overwrite_output().run()
+    video_info = ffmpeg.probe(output_file)['format']
+    duration = video_info['duration']
+    video_size = int(video_info['size'])
+    bit_rate = (video_size / (float(duration) / 8))
+    probe = ffmpeg.probe(input_file)
+    format = probe['format']
+    duration = format['duration']
+    video_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'video'), None)
+    width = int(video_stream['width'])
+    height = int(video_stream['height'])
+
+    logging.info('视频时长:{}  视频大小:{}  视频比特率:{} 视频宽:{} 视频长:{}'.format(duration, video_size, bit_rate, width, height))
+
+    cloud_filepath = 'video/{}.mp4'.format(metadata_title)
+
+    video_format=video_url.split('.')[-1]
+    format='mp4' if len(video_format)>4 else video_format
+
+    # 上传视频
+    update_to_aliyun(local_filepath=output_file, cloud_filepath=cloud_filepath)
+
+    # 删除视频,上传视频
+    os.remove(input_file)
+    os.remove(output_file)
+    return cloud_filepath, metadata_title, video_size, duration, bit_rate, width, height, format
+
+
+def update_to_aliyun(local_filepath, cloud_filepath):
+    # TODO:之后oss相关密码信息写入config
+    access_key_id = os.getenv('OSS_TEST_ACCESS_KEY_ID', 'LTAI5tGAUywhGk8xDuXosquq')
+    access_key_secret = os.getenv('OSS_TEST_ACCESS_KEY_SECRET', 'VMVfRv6DdSYuBipIMU5UdtHBYB0Jjb')
+    bucket_name = os.getenv('OSS_TEST_BUCKET', 'zx-media-database')
+    endpoint = os.getenv('OSS_TEST_ENDPOINT', 'http://oss-cn-hangzhou.aliyuncs.com')
+
+    # 创建Bucket对象,所有Object相关的接口都可以通过Bucket对象来进行
+    bucket = oss2.Bucket(oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name)
+
+    # 查询文件是否已经在云上
+    name_set = set()
+    for i, object_info in enumerate(oss2.ObjectIterator(bucket)):
+        name_set.add(object_info.key)
+    if cloud_filepath not in name_set:
+        logging.info('上传视频' + cloud_filepath)
+        # 数据不存在于云上,则进行数据存储
+
+        # 进行数据存储
+        bucket.put_object_from_file(cloud_filepath, local_filepath)
+
+
+if __name__ == '__main__':
+    import threading
+    for i in range(100):
+        x_thread=threading.Thread(target=change_format,args=('http://wxsnsdy.wxs.qq.com/131/20210/snssvpdownload/SH/reserved/ads_svp_video__0b53qybncaaciaadpfl53jqbrbqe2gdafuka.f110002.mp4?dis_k=2982f4fc3f191cb6199045ca072ed033&dis_t=1618995038&m=beb8a2fd5980bb6fb4f9cef486546338',))
+        x_thread.start()
+
+    change_format('http://wxsnsdy.wxs.qq.com/131/20210/snssvpdownload/SH/reserved/ads_svp_video__0b53qybncaaciaadpfl53jqbrbqe2gdafuka.f110002.mp4?dis_k=2982f4fc3f191cb6199045ca072ed033&dis_t=1618995038&m=beb8a2fd5980bb6fb4f9cef486546338')

+ 35 - 0
example/update_video.py

@@ -74,8 +74,43 @@ def update_byte_rate():
         db_qc.session.commit()
 
 
+def update_video_info_pro():
+    from model.sql_models import DB
+    from config import using_config_test, using_config
+    db_qc = DB(config=using_config.quchen_text)
+    db_qc_test = DB(config=using_config_test.quchen_text)
+    sql_get = '''
+    select 
+      video_id,size, video_length, byte_rate, video_meta_data,download_path
+     from video_info
+    where LENGTH (preview_url )>1 and download_path is not null;
+    '''
+    cursor=db_qc_test.session.execute(sql_get)
+    id_dict={}
+    for line in cursor.fetchall():
+        video_id,size, video_length, byte_rate, video_meta_data, download_path=line
+        id_dict[video_id]=(size, video_length, byte_rate, video_meta_data,download_path)
+
+
+    for k,v in id_dict.items():
+        video_id=k
+        size, video_length, byte_rate, video_meta_data,download_path=v
+        sql_update = '''
+                         update video_info 
+                         set size={}, video_length={},
+                         type='mp4', byte_rate={}, video_meta_data='{}',
+                         download_path='{}'
+                         where 
+                         video_id='{}'
+                        '''.format(size, video_length, byte_rate, video_meta_data, download_path, video_id)
+        db_qc.session.execute(sql_update)
+    db_qc.session.commit()
+
+
 if __name__ == '__main__':
     db_qc = DB(config=quchen_text)
     db_dm = DB(config=dm)
     # update_video_info()
     # update_byte_rate()
+
+    update_video_info_pro()