Просмотр исходного кода

MOD:素材展示部分使用clickhouse

cxyu 3 лет назад
Родитель
Сommit
90f501079b

+ 4 - 1
.gitignore

@@ -1,6 +1,9 @@
 .idea/
 target/
+*.log.*
 *.log
 *.pyc
+example/
+leetcd/
 using_config.py
-using_config_test.py
+using_config_test.py

+ 34 - 31
app/etl/dw/dw_book_trend.py

@@ -2,44 +2,47 @@
 @desc 书维度全量表
 @auth ck
 """
-from model.DateUtils import DateUtils
-from model.DataBaseUtils import MysqlUtils, CkUtils
+from model.sql_models import DB
+from config import using_config
 import logging
 
-du = DateUtils()
-db = MysqlUtils()
-ck = CkUtils()
 
 
 def book_trend():
     logging.info('书籍趋势数据处理,开始')
-    sql = """insert into book_trend
+    sql = """
+    truncate table book_trend;
+    
+    insert into book_trend
     select a.dt,book,type,
-sum(cost) cost,
-sum(order_count),
-sum(order_user),
-sum(order_amount),
-
-sum(first_order_count) first_order_count,
-sum(first_order_user) first_order_user,
-sum(first_order_amount) first_order_amount,
-
-sum(reg_order_count) reg_order_count,
-sum(reg_order_user) reg_order_user,
-sum(reg_order_amount) reg_order_amount,
-sum(view_count) view_count,
-sum(click_count) click_count,
-sum(follow_user) follow_user,
-sum(da1),sum(da2),sum(da3),sum(da4),sum(da5),sum(da6),sum(da7),sum(da8),sum(da9),sum(da10),sum(da11),sum(da12),sum(da13),sum(da14),sum(da15),sum(da16),sum(da17),sum(da18),sum(da19),sum(da20),sum(da21),sum(da22),sum(da23),sum(da24),sum(da25),sum(da26),sum(da27),sum(da28),sum(da29),sum(da30),sum(da31),sum(da32),sum(da33),sum(da34),sum(da35),sum(da36),sum(da37),sum(da38),sum(da39),sum(da40),sum(da41),sum(da42),sum(da43),sum(da44),sum(da45),sum(da46),sum(da47),sum(da48),sum(da49),sum(da50),sum(da51),sum(da52),sum(da53),sum(da54),sum(da55),sum(da56),sum(da57),sum(da58),sum(da59),sum(da60),sum(dm3),sum(dm4),sum(dm5),
-sum(ba1),sum(ba2),sum(ba3),sum(ba4),sum(ba5),sum(ba6),sum(ba7)
-
-from dw_channel a 
-left join dw_channel_amount_daily b on a.dt=b.dt and a.channel=b.channel 
-left join dw_channel_amount_daily_reverse c on a.dt=c.dt and a.channel=c.channel 
-where a.book!=''
-GROUP BY dt,book,type"""
-    db.dm.execute('truncate table book_trend')
-    db.dm.execute(sql)
+    sum(cost) cost,
+    sum(order_count),
+    sum(order_user),
+    sum(order_amount),
+    
+    sum(first_order_count) first_order_count,
+    sum(first_order_user) first_order_user,
+    sum(first_order_amount) first_order_amount,
+    
+    sum(reg_order_count) reg_order_count,
+    sum(reg_order_user) reg_order_user,
+    sum(reg_order_amount) reg_order_amount,
+    sum(view_count) view_count,
+    sum(click_count) click_count,
+    sum(follow_user) follow_user,
+    sum(da1),sum(da2),sum(da3),sum(da4),sum(da5),sum(da6),sum(da7),sum(da8),sum(da9),sum(da10),sum(da11),sum(da12),sum(da13),sum(da14),sum(da15),sum(da16),sum(da17),sum(da18),sum(da19),sum(da20),sum(da21),sum(da22),sum(da23),sum(da24),sum(da25),sum(da26),sum(da27),sum(da28),sum(da29),sum(da30),sum(da31),sum(da32),sum(da33),sum(da34),sum(da35),sum(da36),sum(da37),sum(da38),sum(da39),sum(da40),sum(da41),sum(da42),sum(da43),sum(da44),sum(da45),sum(da46),sum(da47),sum(da48),sum(da49),sum(da50),sum(da51),sum(da52),sum(da53),sum(da54),sum(da55),sum(da56),sum(da57),sum(da58),sum(da59),sum(da60),sum(dm3),sum(dm4),sum(dm5),
+    sum(ba1),sum(ba2),sum(ba3),sum(ba4),sum(ba5),sum(ba6),sum(ba7)
+    
+    from dw_channel a 
+    left join dw_channel_amount_daily b on a.dt=b.dt and a.channel=b.channel 
+    left join dw_channel_amount_daily_reverse c on a.dt=c.dt and a.channel=c.channel 
+    where a.book!=''
+    GROUP BY dt,book,type"""
+
+
+    dm_db=DB(config=using_config.dm)
+    dm_db.session.execute(sql)
+    dm_db.session.commit()
     logging.info('书籍趋势数据处理,结束')
 
 

+ 34 - 1
app/etl/dw/dw_image_cost_day.py

@@ -2,8 +2,10 @@ import logging
 from model.DataBaseUtils import MysqlUtils, CkUtils
 from model.DateUtils import DateUtils
 from model.DingTalkUtils import DingTalkUtils
-# logging.getLogger().setLevel(logging.WARNING)
 import pandas as pd
+from datetime import datetime
+# logging.getLogger().setLevel(logging.WARNING)
+
 
 db = MysqlUtils()
 ck = CkUtils()
@@ -118,6 +120,37 @@ def run(dt):
         data)
 
 
+
+    #ck对应数据也保存一份
+    ck.execute(f'''
+     alter table dw_image_cost_day drop partition '{dt}'
+    ''')
+    col = ['dt', 'type', 'use_times', 'cost', 'view_count', 'click_count',
+           'follow_count', 'order_count', 'order_amount', 'title', 'description',
+           'book', 'platform', 'stage', 'channel', 'pitcher', 'image_id', 'preview_url',
+           'signature', 'is_video', 'width', 'height', 'size', 'format', 'video_length',
+           'video_bit_rate', 'video_meta_data', 'download_path']
+    #ck存入前进行数据格式化
+    for _ in data:
+        # data= [col if col else 'null' for col in data ]
+        _[0] = datetime.strptime(_[0], '%Y-%m-%d')
+        _[1] = str(_[1]) if _[1] is not None else None
+        _[2] = int(_[2]) if _[2] is not None else None
+        _[3] = float(_[3]) if _[3] is not None else None
+        _[4] = int(_[4]) if _[4] is not None else None
+        _[5] = int(_[5]) if _[5] is not None else None
+        _[6] = int(_[6]) if _[6] is not None else None
+        _[7] = int(_[7]) if _[7] is not None else None
+        _[8] = float(_[8]) if _[8] is not None else None
+        _[19] = str(_[19])
+        _[20] = str(_[20]) if _[20] is not None else None
+        _[21] = str(_[21]) if _[21] is not None else None
+        _[22] = str(_[22]) if _[22] is not None else None
+        _[23] = str(_[23]) if _[23] is not None else None
+
+    col_str = ','.join(col)
+    ck.client.execute('insert into dw_image_cost_day ({}) values'.format(col_str), data)
+
 def hourly():
     try:
         logging.info('广告数据清洗,开始')

+ 9 - 9
app/etl/dw/dw_pitcher_daily.py

@@ -2,19 +2,18 @@
 @desc 投手维度全量表
 @auth ck
 """
-from model.DateUtils import DateUtils
-from model.DataBaseUtils import MysqlUtils, CkUtils
+from model.sql_models import DB
+from config import using_config
 import logging
 
-du = DateUtils()
-db = MysqlUtils()
-ck = CkUtils()
-
 
 def dw_pitcher_trend():
     logging.info('投手趋势数据处理,开始')
     logging.info('run> dw_pitcher_trend')
-    sql = """insert into dw_pitcher_trend 
+    sql = """
+    truncate table dw_pitcher_trend;
+    
+    insert into dw_pitcher_trend 
     select a.dt,pitcher,
        sum(cost),
        sum(order_amount),
@@ -39,8 +38,9 @@ left join src_book_info c on a.book=c.book and a.platform=c.platform and a.type=
 left join dw_channel_amount_daily_reverse d on a.dt=d.dt and a.channel=d.channel 
 group by pitcher,a.dt"""
 
-    db.dm.execute("truncate table dw_pitcher_trend")
-    db.dm.execute(sql)
+    dm_db = DB(config=using_config.dm)
+    dm_db.session.execute(sql)
+    dm_db.session.commit()
     logging.info('投手趋势数据处理,结束')
 
 

+ 0 - 0
data_processing/__init__.py


+ 0 - 1
model/DataBaseUtils.py

@@ -162,7 +162,6 @@ class CkUtils:
         """
         max = 1000
         sql = "insert into {} ({}) values ".format(table, col)
-
         if len(data) == 0:
             logging.info("data.len==0")
             return

+ 2 - 1
model/sql_models.py

@@ -1,5 +1,6 @@
 from sqlalchemy import create_engine, MetaData
 from sqlalchemy.orm import sessionmaker, scoped_session
+from pymysql.constants.CLIENT import MULTI_STATEMENTS
 from config import using_config
 import urllib
 
@@ -20,7 +21,7 @@ class DB():
                                                                          database=self.config['database'])
 
         engine = create_engine(db_uri, pool_size=1024, pool_recycle=1800,
-                               pool_pre_ping=True, max_overflow=100, echo=False)
+                               pool_pre_ping=True, max_overflow=100, echo=False,connect_args={"client_flag": MULTI_STATEMENTS})
         return engine
 
     def DBSession(self):