浏览代码

cost_minute 写入 ck改成 批量写入

wcc 3 年之前
父节点
当前提交
06d37dc930

+ 7 - 7
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdHourStreamJob.java

@@ -1,10 +1,9 @@
 package flink.zanxiangnet.ad.monitoring;
 package flink.zanxiangnet.ad.monitoring;
 
 
-import flink.zanxiangnet.ad.monitoring.clickhouse.sink.BatchSinkHour;
-import flink.zanxiangnet.ad.monitoring.clickhouse.sink.BatchSinkMinute;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.*;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.*;
 import flink.zanxiangnet.ad.monitoring.process.*;
 import flink.zanxiangnet.ad.monitoring.process.*;
 import flink.zanxiangnet.ad.monitoring.sink.AdHourDMToCkBatchSink;
 import flink.zanxiangnet.ad.monitoring.sink.AdHourDMToCkBatchSink;
+import flink.zanxiangnet.ad.monitoring.sink.AdMinuteDMToCkBatchSink;
 import flink.zanxiangnet.ad.monitoring.sink.TunnelBatchStreamSink;
 import flink.zanxiangnet.ad.monitoring.sink.TunnelBatchStreamSink;
 import flink.zanxiangnet.ad.monitoring.stream.BatchStream;
 import flink.zanxiangnet.ad.monitoring.stream.BatchStream;
 import flink.zanxiangnet.ad.monitoring.stream.KeyedBatchStream;
 import flink.zanxiangnet.ad.monitoring.stream.KeyedBatchStream;
@@ -89,14 +88,15 @@ public class AdHourStreamJob {
                 adMinuteDWDStream
                 adMinuteDWDStream
                         .keyBy(AdStatOfMinuteDWD::getAdId)
                         .keyBy(AdStatOfMinuteDWD::getAdId)
                         .process(new CostMinuteProcess());
                         .process(new CostMinuteProcess());
-
-        clickhouseMinuteDmStream.addSink(new BatchSinkMinute())
+        new BatchStream<>("adMinuteDMStream", clickhouseMinuteDmStream, 1000L, 60 * 1000L)
+                .toBatch()
+                .addSink(new AdMinuteDMToCkBatchSink())
                 .name("sink_ad_minute_dm_clickhouse");
                 .name("sink_ad_minute_dm_clickhouse");
 
 
         // 小时流
         // 小时流
         DataStream<AdDataOfHourODS> adHourODSStream = adODSStream.getSideOutput(adHourStreamTag);
         DataStream<AdDataOfHourODS> adHourODSStream = adODSStream.getSideOutput(adHourStreamTag);
         // 小时流-写入原始表
         // 小时流-写入原始表
-        new KeyedBatchStream<>("adHourODSStream", adHourODSStream.keyBy(AdDataOfHourODS::getStatDay), 4000L, 5 * 60 * 1000L)
+        new KeyedBatchStream<>("adHourODSStream", adHourODSStream.keyBy(AdDataOfHourODS::getStatDay), 4000L, 3 * 60 * 1000L)
                 .toBatch()
                 .toBatch()
                 .addSink(new TunnelBatchStreamSink<>(AdDataOfHourODS.class))
                 .addSink(new TunnelBatchStreamSink<>(AdDataOfHourODS.class))
                 .name("sink_ad_hour_ods");
                 .name("sink_ad_hour_ods");
@@ -108,7 +108,7 @@ public class AdHourStreamJob {
                         .process(new AdHourDWDProcess());
                         .process(new AdHourDWDProcess());
 
 
         // 小时流-写入maxCompute
         // 小时流-写入maxCompute
-        new KeyedBatchStream<>("adHourDWDStream", adHourDWDStream.keyBy(AdStatOfHourDWD::getStatDay), 4000L, 60 * 1000L)
+        new KeyedBatchStream<>("adHourDWDStream", adHourDWDStream.keyBy(AdStatOfHourDWD::getStatDay), 4000L, 3 * 60 * 1000L)
                 .toBatch()
                 .toBatch()
                 .addSink(new TunnelBatchStreamSink<>(AdStatOfHourDWD.class))
                 .addSink(new TunnelBatchStreamSink<>(AdStatOfHourDWD.class))
                 .name("sink_ad_hour_dwd");
                 .name("sink_ad_hour_dwd");
@@ -124,7 +124,7 @@ public class AdHourStreamJob {
                 adHourDWDAllStream
                 adHourDWDAllStream
                         .keyBy(AdStatOfHourDWD::getAdId)
                         .keyBy(AdStatOfHourDWD::getAdId)
                         .process(new CostHourProcess());
                         .process(new CostHourProcess());
-        new BatchStream<>("adHourDMStream", adHourDMStream, 4000L, 60 * 1000L)
+        new BatchStream<>("adHourDMStream", adHourDMStream, 1000L, 60 * 1000L)
                 .toBatch()
                 .toBatch()
                 .addSink(new AdHourDMToCkBatchSink())
                 .addSink(new AdHourDMToCkBatchSink())
                 .name("sink_ad_hour_dm_clickhouse");
                 .name("sink_ad_hour_dm_clickhouse");

+ 13 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/dao/mapper/CostMinuterDMMapper.java

@@ -0,0 +1,13 @@
+package flink.zanxiangnet.ad.monitoring.dao.mapper;
+
+import flink.zanxiangnet.ad.monitoring.pojo.entity.CostMinuterDM;
+import org.apache.ibatis.annotations.Param;
+
+import java.util.List;
+
+public interface CostMinuterDMMapper {
+
+    int add(@Param("item") CostMinuterDM item);
+
+    int addBatch(@Param("list") List<CostMinuterDM> list);
+}

+ 62 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/dao/mapper/CostMinuterDMMapper.xml

@@ -0,0 +1,62 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+
+<mapper namespace="flink.zanxiangnet.ad.monitoring.dao.mapper.CostMinuterDMMapper">
+
+    <sql id="Base_Column_List">
+        dt, minute, create_time,
+        hour, ad_id, adgroup_id,
+        adcreative_id, account_id, campaign_id,
+        cost_minute, cost_diff, cost_last_hour,
+        cost_last_hour_diff, cost_last_two_hour, cost_last_two_hour_diff,
+        cost_last_three_trend, cost_speed, view_count_minute,
+        thousand_display_price_minute, valid_click_count_minute, ctr_minute,
+        cpc_minute, conversions_count_minute, conversions_cost_minute,
+        conversions_rate_minute, first_day_order_roi_minute, first_day_order_amount_minute,
+        first_day_order_count_minute, web_order_amount_minute, web_order_cost_minute,
+        web_order_rate_minute, web_order_count_minute, order_roi_minute,
+        order_unit_price_minute, from_follow_uv_minute, from_follow_cost_minute,
+        from_follow_rate_minute, web_register_count_minute, web_register_uv_minute,
+        web_register_cost_minute, agency_account_id
+    </sql>
+
+    <insert id="add">
+        INSERT INTO cost_minute(<include refid="Base_Column_List"/>)
+        VALUES
+        (#{item.dt}, #{item.createTime}, #{item.campaignId},
+        #{item.hour}, #{item.minute}, #{item.adId},
+        #{item.agencyAccountId}, #{item.adgroupId}, #{item.adcreativeId},
+        #{item.accountId}, #{item.costMinute}, #{item.costDiff},
+        #{item.costLastHour}, #{item.costLastHourDiff}, #{item.costLastTwoHour},
+        #{item.costLastTwoHourDiff}, #{item.costLastThreeTrend}, #{item.costSpeed},
+        #{item.viewCountMinute}, #{item.thousandDisplayPriceMinute}, #{item.validClickCountMinute},
+        #{item.ctrMinute}, #{item.cpcMinute}, #{item.conversionsCountMinute},
+        #{item.conversionsCostMinute}, #{item.conversionsRateMinute}, #{item.firstDayOrderRoiMinute},
+        #{item.firstDayOrderAmountMinute}, #{item.firstDayOrderCountMinute}, #{item.webOrderAmountMinute},
+        #{item.webOrderCostMinute}, #{item.webOrderRateMinute}, #{item.webOrderCountMinute},
+        #{item.orderRoiMinute}, #{item.orderUnitPriceMinute}, #{item.fromFollowUvMinute},
+        #{item.fromFollowCostMinute}, #{item.fromFollowRateMinute}, #{item.webRegisterCountMinute},
+        #{item.webRegisterUvMinute}, #{item.webRegisterCostMinute})
+    </insert>
+
+    <insert id="addBatch">
+        INSERT INTO cost_minute(<include refid="Base_Column_List"/>)
+        VALUES
+        <foreach collection="list" index="index" item="item" separator=",">
+            (#{item.dt}, #{item.createTime}, #{item.campaignId},
+            #{item.hour}, #{item.minute}, #{item.adId},
+            #{item.agencyAccountId}, #{item.adgroupId}, #{item.adcreativeId},
+            #{item.accountId}, #{item.costMinute}, #{item.costDiff},
+            #{item.costLastHour}, #{item.costLastHourDiff}, #{item.costLastTwoHour},
+            #{item.costLastTwoHourDiff}, #{item.costLastThreeTrend}, #{item.costSpeed},
+            #{item.viewCountMinute}, #{item.thousandDisplayPriceMinute}, #{item.validClickCountMinute},
+            #{item.ctrMinute}, #{item.cpcMinute}, #{item.conversionsCountMinute},
+            #{item.conversionsCostMinute}, #{item.conversionsRateMinute}, #{item.firstDayOrderRoiMinute},
+            #{item.firstDayOrderAmountMinute}, #{item.firstDayOrderCountMinute}, #{item.webOrderAmountMinute},
+            #{item.webOrderCostMinute}, #{item.webOrderRateMinute}, #{item.webOrderCountMinute},
+            #{item.orderRoiMinute}, #{item.orderUnitPriceMinute}, #{item.fromFollowUvMinute},
+            #{item.fromFollowCostMinute}, #{item.fromFollowRateMinute}, #{item.webRegisterCountMinute},
+            #{item.webRegisterUvMinute}, #{item.webRegisterCostMinute})
+        </foreach>
+    </insert>
+</mapper>

+ 0 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/pojo/entity/CostMinuterDM.java

@@ -14,8 +14,6 @@ public class CostMinuterDM {
     public String hour;
     public String hour;
     //时间-分钟
     //时间-分钟
     public String minute;
     public String minute;
-    //时间-真实
-    public String createtime;
     //广告id
     //广告id
     public String adId;
     public String adId;
     //服务商id
     //服务商id

+ 75 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/sink/AdMinuteDMToCkBatchSink.java

@@ -0,0 +1,75 @@
+package flink.zanxiangnet.ad.monitoring.sink;
+
+import com.aliyun.odps.tunnel.TunnelException;
+import flink.zanxiangnet.ad.monitoring.config.ClickhouseDataSourceFactory;
+import flink.zanxiangnet.ad.monitoring.dao.mapper.CostMinuterDMMapper;
+import flink.zanxiangnet.ad.monitoring.pojo.entity.CostMinuterDM;
+import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.ibatis.datasource.DataSourceFactory;
+import org.apache.ibatis.mapping.Environment;
+import org.apache.ibatis.session.SqlSession;
+import org.apache.ibatis.session.SqlSessionFactory;
+import org.apache.ibatis.session.SqlSessionFactoryBuilder;
+import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * 批量数据写出
+ */
+@Slf4j
+public class AdMinuteDMToCkBatchSink extends RichSinkFunction<List<CostMinuterDM>> {
+
+    private SqlSessionFactory sqlSessionFactory;
+
+
+    @Override
+    public void open(Configuration config) throws Exception {
+        Map<String, String> params = getRuntimeContext()
+                .getExecutionConfig()
+                .getGlobalJobParameters()
+                .toMap();
+
+        Properties ckProps = new Properties();
+        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_URL, params.get(ApplicationProperties.CK_URL));
+        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_USER, params.get(ApplicationProperties.CK_USERNAME));
+        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_PASSWORD, params.get(ApplicationProperties.CK_PASSWORD));
+
+        DataSourceFactory dataSourceFactory = new ClickhouseDataSourceFactory();
+        dataSourceFactory.setProperties(ckProps);
+        Environment environment = new Environment("clickhouse", new JdbcTransactionFactory(), dataSourceFactory.getDataSource());
+        org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration(environment);
+        // 开启驼峰规则
+        configuration.setMapUnderscoreToCamelCase(true);
+        configuration.getTypeAliasRegistry().registerAlias(CostMinuterDM.class);
+        // addMapper一定要放到 alias的后面!!!!!
+        configuration.addMapper(CostMinuterDMMapper.class);
+        sqlSessionFactory = new SqlSessionFactoryBuilder().build(configuration);
+    }
+
+    /**
+     * 将值写入到 Sink。每个值都会调用此函数
+     *
+     * @param value
+     * @param context
+     */
+    @Override
+    public void invoke(List<CostMinuterDM> value, Context context) throws TunnelException, IOException, InvocationTargetException, IllegalAccessException {
+        try (SqlSession session = sqlSessionFactory.openSession()) {
+            CostMinuterDMMapper mapper = session.getMapper(CostMinuterDMMapper.class);
+            mapper.addBatch(value);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+    }
+}