Pārlūkot izejas kodu

costhour写入 ck改成 批量写入

wcc 3 gadi atpakaļ
vecāks
revīzija
40a0a9c200

+ 2 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdDayStreamJob.java

@@ -6,7 +6,7 @@ import flink.zanxiangnet.ad.monitoring.pojo.entity.AdDataOfDayODS;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfDayDWD;
 import flink.zanxiangnet.ad.monitoring.process.AdDayDWDRollMonthProcess;
 import flink.zanxiangnet.ad.monitoring.process.AdDayDWDRollYearProcess;
-import flink.zanxiangnet.ad.monitoring.sink.AdDWDToCkBatchSink;
+import flink.zanxiangnet.ad.monitoring.sink.AdDayDWDToCkBatchSink;
 import flink.zanxiangnet.ad.monitoring.sink.TunnelBatchStreamSink;
 import flink.zanxiangnet.ad.monitoring.stream.BatchStream;
 import flink.zanxiangnet.ad.monitoring.stream.KeyedBatchStream;
@@ -96,7 +96,7 @@ public class AdDayStreamJob {
                 .addSink(new TunnelBatchStreamSink<>(AdStatOfDayDWD.class))
                 .name("sink_ad_year_dwd");
         // 写入 ck
-        new BatchStream<>("adDWDToCkStream", adDayStream, 1000L, 60 * 1000L).toBatch().addSink(new AdDWDToCkBatchSink());
+        new BatchStream<>("adDWDToCkStream", adDayStream, 1000L, 60 * 1000L).toBatch().addSink(new AdDayDWDToCkBatchSink());
 
         env.execute("ad_day_stream_job");
     }

+ 10 - 8
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdHourStreamJob.java

@@ -4,7 +4,9 @@ import flink.zanxiangnet.ad.monitoring.clickhouse.sink.BatchSinkHour;
 import flink.zanxiangnet.ad.monitoring.clickhouse.sink.BatchSinkMinute;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.*;
 import flink.zanxiangnet.ad.monitoring.process.*;
+import flink.zanxiangnet.ad.monitoring.sink.AdHourDMToCkBatchSink;
 import flink.zanxiangnet.ad.monitoring.sink.TunnelBatchStreamSink;
+import flink.zanxiangnet.ad.monitoring.stream.BatchStream;
 import flink.zanxiangnet.ad.monitoring.stream.KeyedBatchStream;
 import flink.zanxiangnet.ad.monitoring.trigger.AdMinuteODSStreamTrigger;
 import flink.zanxiangnet.ad.monitoring.kafka.KafkaComponent;
@@ -86,11 +88,10 @@ public class AdHourStreamJob {
         SingleOutputStreamOperator<CostMinuterDM> clickhouseMinuteDmStream =
                 adMinuteDWDStream
                         .keyBy(AdStatOfMinuteDWD::getAdId)
-                        .process(new CostMinuteProcess())
-                        .name("sink_ad_minute_dm_clickhouse");
+                        .process(new CostMinuteProcess());
 
-        BatchSinkMinute batchSinkMinute = new BatchSinkMinute();
-        clickhouseMinuteDmStream.addSink(batchSinkMinute);
+        clickhouseMinuteDmStream.addSink(new BatchSinkMinute())
+                .name("sink_ad_minute_dm_clickhouse");
 
         // 小时流
         DataStream<AdDataOfHourODS> adHourODSStream = adODSStream.getSideOutput(adHourStreamTag);
@@ -119,13 +120,14 @@ public class AdHourStreamJob {
 
         //小时流-写入 ck
         DataStream<AdStatOfHourDWD> adHourDWDAllStream = hourStreamFromMinute.union(adHourDWDStream);
-        SingleOutputStreamOperator<CostHourDM> clickhouseHourDmStream =
+        SingleOutputStreamOperator<CostHourDM> adHourDMStream =
                 adHourDWDAllStream
                         .keyBy(AdStatOfHourDWD::getAdId)
                         .process(new CostHourProcess());
-
-        BatchSinkHour batchSinkhour = new BatchSinkHour();
-        clickhouseHourDmStream.addSink(batchSinkhour).name("sink_ad_hour_dm_clickhouse");
+        new BatchStream<>("adHourDMStream", adHourDMStream, 4000L, 60 * 1000L)
+                .toBatch()
+                .addSink(new AdHourDMToCkBatchSink())
+                .name("sink_ad_hour_dm_clickhouse");
 
         env.execute("ad_hour_stream_job");
     }

+ 2 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/PlanDayStreamJob.java

@@ -6,7 +6,7 @@ import flink.zanxiangnet.ad.monitoring.pojo.entity.AdDataOfDayODS;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.PlanStatOfDayDWD;
 import flink.zanxiangnet.ad.monitoring.process.PlanDayDWDRollMonthProcess;
 import flink.zanxiangnet.ad.monitoring.process.PlanDayDWDRollYearProcess;
-import flink.zanxiangnet.ad.monitoring.sink.PlanDWDToCkBatchSink;
+import flink.zanxiangnet.ad.monitoring.sink.PlanDayDWDToCkBatchSink;
 import flink.zanxiangnet.ad.monitoring.sink.TunnelBatchStreamSink;
 import flink.zanxiangnet.ad.monitoring.stream.BatchStream;
 import flink.zanxiangnet.ad.monitoring.stream.KeyedBatchStream;
@@ -90,7 +90,7 @@ public class PlanDayStreamJob {
                 .addSink(new TunnelBatchStreamSink<>(PlanStatOfDayDWD.class))
                 .name("sink_plan_year_dwd");
         // 写入 ck
-        new BatchStream<>("planDWDToCkStream", planDayDWDStream, 1000L, 60 * 1000L).toBatch().addSink(new PlanDWDToCkBatchSink());
+        new BatchStream<>("planDWDToCkStream", planDayDWDStream, 1000L, 60 * 1000L).toBatch().addSink(new PlanDayDWDToCkBatchSink());
 
         env.execute("plan_day_stream_job");
     }

+ 89 - 89
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/clickhouse/sink/BatchSinkHour.java

@@ -126,95 +126,96 @@ public class BatchSinkHour extends RichSinkFunction<CostHourDM> {
                 "?,?,?,?,?,?,?,?,?,?,\n" +
                 "?,?,?,?,?,?,?,?)";
         PreparedStatement preparedStatement = connection.prepareStatement(sql);
-        preparedStatement.setString(1, costhour.dt);
-        preparedStatement.setString(2, costhour.createTime);
-        preparedStatement.setString(3, costhour.hour);
-        preparedStatement.setString(4, costhour.adId);
-        preparedStatement.setString(5, costhour.adgroupId);
-        preparedStatement.setString(6, costhour.adcreativeId);
-        preparedStatement.setString(7, costhour.accountId);
-        preparedStatement.setString(8, costhour.campaignId);
-        preparedStatement.setLong(9, costhour.costTotal);
-        preparedStatement.setLong(10, costhour.costDay);
-        preparedStatement.setLong(11, costhour.costHour);
-        preparedStatement.setLong(12, costhour.costDiff);
-        preparedStatement.setLong(13, costhour.costLastHour);
-        preparedStatement.setLong(14, costhour.costLastHourDiff);
-        preparedStatement.setLong(15, costhour.costLastTwoHour);
-        preparedStatement.setLong(16, costhour.costLastTwoHourDiff);
-        preparedStatement.setLong(17, costhour.costLastThreeTrend);
-        preparedStatement.setDouble(18, costhour.costSpeed);
-        preparedStatement.setLong(19, costhour.viewCountTotal);
-        preparedStatement.setLong(20, costhour.viewCountDay);
-        preparedStatement.setLong(21, costhour.viewCountHour);
-        preparedStatement.setLong(22, costhour.thousandDisplayPriceAll);
-        preparedStatement.setLong(23, costhour.thousandDisplayPriceDay);
-        preparedStatement.setLong(24, costhour.thousandDisplayPriceHour);
-        preparedStatement.setLong(25, costhour.validClickCountTotal);
-        preparedStatement.setLong(26, costhour.validClickCountDay);
-        preparedStatement.setLong(27, costhour.validClickCountHour);
-        preparedStatement.setDouble(28, costhour.ctrAll);
-        preparedStatement.setDouble(29, costhour.ctrDay);
-        preparedStatement.setDouble(30, costhour.ctrHour);
-        preparedStatement.setLong(31, costhour.cpcAll);
-        preparedStatement.setLong(32, costhour.cpcDay);
-        preparedStatement.setLong(33, costhour.cpcHour);
-        preparedStatement.setLong(34, costhour.conversionsCountTotal);
-        preparedStatement.setLong(35, costhour.conversionsCountDay);
-        preparedStatement.setLong(36, costhour.conversionsCountHour);
-        preparedStatement.setLong(37, costhour.conversionsCostTotal);
-        preparedStatement.setLong(38, costhour.conversionsCostDay);
-        preparedStatement.setLong(39, costhour.conversionsCostHour);
-        preparedStatement.setDouble(40, costhour.conversionsRateAll);
-        preparedStatement.setDouble(41, costhour.conversionsRateDay);
-        preparedStatement.setDouble(42, costhour.conversionsRateHour);
-        preparedStatement.setDouble(43, costhour.firstDayOrderRoiTotal);
-        preparedStatement.setDouble(44, costhour.firstDayOrderRoiDay);
-        preparedStatement.setDouble(45, costhour.firstDayOrderRoiHour);
-        preparedStatement.setLong(46, costhour.firstDayOrderAmountTotal);
-        preparedStatement.setLong(47, costhour.firstDayOrderAmountDay);
-        preparedStatement.setLong(48, costhour.firstDayOrderAmountHour);
-        preparedStatement.setLong(49, costhour.firstDayOrderCountTotal);
-        preparedStatement.setLong(50, costhour.firstDayOrderCountDay);
-        preparedStatement.setLong(51, costhour.firstDayOrderCountHour);
-        preparedStatement.setLong(52, costhour.webOrderAmountTotal);
-        preparedStatement.setLong(53, costhour.webOrderAmountDay);
-        preparedStatement.setLong(54, costhour.webOrderAmountHour);
-        preparedStatement.setLong(55, costhour.webOrderCostTotal);
-        preparedStatement.setLong(56, costhour.webOrderCostDay);
-        preparedStatement.setLong(57, costhour.webOrderCostHour);
-        preparedStatement.setDouble(58, costhour.webOrderRateTotal);
-        preparedStatement.setDouble(59, costhour.webOrderRateDay);
-        preparedStatement.setDouble(60, costhour.webOrderRateHour);
-        preparedStatement.setLong(61, costhour.webOrderCountTotal);
-        preparedStatement.setLong(62, costhour.webOrderCountDay);
-        preparedStatement.setLong(63, costhour.webOrderCountHour);
-        preparedStatement.setDouble(64, costhour.orderRoiTotal);
-        preparedStatement.setDouble(65, costhour.orderRoiDay);
-        preparedStatement.setDouble(66, costhour.orderRoiHour);
-        preparedStatement.setLong(67, costhour.orderUnitPriceTotal);
-        preparedStatement.setLong(68, costhour.orderUnitPriceDay);
-        preparedStatement.setLong(69, costhour.orderUnitPriceHour);
-        preparedStatement.setLong(70, costhour.fromFollowUvTotal);
-        preparedStatement.setLong(71, costhour.fromFollowUvDay);
-        preparedStatement.setLong(72, costhour.fromFollowUvHour);
-        preparedStatement.setLong(73, costhour.fromFollowCostTotal);
-        preparedStatement.setLong(74, costhour.fromFollowCostDay);
-        preparedStatement.setLong(75, costhour.fromFollowCostHour);
-        preparedStatement.setDouble(76, costhour.fromFollowRateTotal);
-        preparedStatement.setDouble(77, costhour.fromFollowRateDay);
-        preparedStatement.setDouble(78, costhour.fromFollowRateHour);
-        preparedStatement.setLong(79, costhour.webRegisterCountTotal);
-        preparedStatement.setLong(80, costhour.webRegisterCountDay);
-        preparedStatement.setLong(81, costhour.webRegisterCountHour);
-        preparedStatement.setLong(82, costhour.webRegisterUvTotal);
-        preparedStatement.setLong(83, costhour.webRegisterUvDay);
-        preparedStatement.setLong(84, costhour.webRegisterUvHour);
-        preparedStatement.setDouble(85, costhour.webRegisterCostTotal);
-        preparedStatement.setDouble(86, costhour.webRegisterCostDay);
-        preparedStatement.setDouble(87, costhour.webRegisterCostHour);
-        preparedStatement.setString(88, costhour.agencyAccountId);
+        preparedStatement.setString(1, costhour.getDt());
+        preparedStatement.setString(2, costhour.getCreateTime());
+        preparedStatement.setString(3, costhour.getHour());
+        preparedStatement.setString(4, costhour.getAdId());
+        preparedStatement.setString(5, costhour.getAdgroupId());
+        preparedStatement.setString(6, costhour.getAdcreativeId());
+        preparedStatement.setString(7, costhour.getAccountId());
+        preparedStatement.setString(8, costhour.getCampaignId());
+        preparedStatement.setLong(9, costhour.getCostTotal());
+        preparedStatement.setLong(10, costhour.getCostDay());
+        preparedStatement.setLong(11, costhour.getCostHour());
+        preparedStatement.setLong(12, costhour.getCostDiff());
+        preparedStatement.setLong(13, costhour.getCostLastHour());
+        preparedStatement.setLong(14, costhour.getCostLastHourDiff());
+        preparedStatement.setLong(15, costhour.getCostLastTwoHour());
+        preparedStatement.setLong(16, costhour.getCostLastTwoHourDiff());
+        preparedStatement.setLong(17, costhour.getCostLastThreeTrend());
+        preparedStatement.setDouble(18, costhour.getCostSpeed());
+        preparedStatement.setLong(19, costhour.getViewCountTotal());
+        preparedStatement.setLong(20, costhour.getViewCountDay());
+        preparedStatement.setLong(21, costhour.getViewCountHour());
+        preparedStatement.setLong(22, costhour.getThousandDisplayPriceAll());
+        preparedStatement.setLong(23, costhour.getThousandDisplayPriceDay());
+        preparedStatement.setLong(24, costhour.getThousandDisplayPriceHour());
+        preparedStatement.setLong(25, costhour.getValidClickCountTotal());
+        preparedStatement.setLong(26, costhour.getValidClickCountDay());
+        preparedStatement.setLong(27, costhour.getValidClickCountHour());
+        preparedStatement.setDouble(28, costhour.getCtrAll());
+        preparedStatement.setDouble(29, costhour.getCtrDay());
+        preparedStatement.setDouble(30, costhour.getCtrHour());
+        preparedStatement.setLong(31, costhour.getCpcAll());
+        preparedStatement.setLong(32, costhour.getCpcDay());
+        preparedStatement.setLong(33, costhour.getCpcHour());
+        preparedStatement.setLong(34, costhour.getConversionsCountTotal());
+        preparedStatement.setLong(35, costhour.getConversionsCountDay());
+        preparedStatement.setLong(36, costhour.getConversionsCountHour());
+        preparedStatement.setLong(37, costhour.getConversionsCostTotal());
+        preparedStatement.setLong(38, costhour.getConversionsCostDay());
+        preparedStatement.setLong(39, costhour.getConversionsCostHour());
+        preparedStatement.setDouble(40, costhour.getConversionsRateAll());
+        preparedStatement.setDouble(41, costhour.getConversionsRateDay());
+        preparedStatement.setDouble(42, costhour.getConversionsRateHour());
+        preparedStatement.setDouble(43, costhour.getFirstDayOrderRoiTotal());
+        preparedStatement.setDouble(44, costhour.getFirstDayOrderRoiDay());
+        preparedStatement.setDouble(45, costhour.getFirstDayOrderRoiHour());
+        preparedStatement.setLong(46, costhour.getFirstDayOrderAmountTotal());
+        preparedStatement.setLong(47, costhour.getFirstDayOrderAmountDay());
+        preparedStatement.setLong(48, costhour.getFirstDayOrderAmountHour());
+        preparedStatement.setLong(49, costhour.getFirstDayOrderCountTotal());
+        preparedStatement.setLong(50, costhour.getFirstDayOrderCountDay());
+        preparedStatement.setLong(51, costhour.getFirstDayOrderCountHour());
+        preparedStatement.setLong(52, costhour.getWebOrderAmountTotal());
+        preparedStatement.setLong(53, costhour.getWebOrderAmountDay());
+        preparedStatement.setLong(54, costhour.getWebOrderAmountHour());
+        preparedStatement.setLong(55, costhour.getWebOrderCostTotal());
+        preparedStatement.setLong(56, costhour.getWebOrderCostDay());
+        preparedStatement.setLong(57, costhour.getWebOrderCostHour());
+        preparedStatement.setDouble(58, costhour.getWebOrderRateTotal());
+        preparedStatement.setDouble(59, costhour.getWebOrderRateDay());
+        preparedStatement.setDouble(60, costhour.getWebOrderRateHour());
+        preparedStatement.setLong(61, costhour.getWebOrderCountTotal());
+        preparedStatement.setLong(62, costhour.getWebOrderCountDay());
+        preparedStatement.setLong(63, costhour.getWebOrderCountHour());
+        preparedStatement.setDouble(64, costhour.getOrderRoiTotal());
+        preparedStatement.setDouble(65, costhour.getOrderRoiDay());
+        preparedStatement.setDouble(66, costhour.getOrderRoiHour());
+        preparedStatement.setLong(67, costhour.getOrderUnitPriceTotal());
+        preparedStatement.setLong(68, costhour.getOrderUnitPriceDay());
+        preparedStatement.setLong(69, costhour.getOrderUnitPriceHour());
+        preparedStatement.setLong(70, costhour.getFromFollowUvTotal());
+        preparedStatement.setLong(71, costhour.getFromFollowUvDay());
+        preparedStatement.setLong(72, costhour.getFromFollowUvHour());
+        preparedStatement.setLong(73, costhour.getFromFollowCostTotal());
+        preparedStatement.setLong(74, costhour.getFromFollowCostDay());
+        preparedStatement.setLong(75, costhour.getFromFollowCostHour());
+        preparedStatement.setDouble(76, costhour.getFromFollowRateTotal());
+        preparedStatement.setDouble(77, costhour.getFromFollowRateDay());
+        preparedStatement.setDouble(78, costhour.getFromFollowRateHour());
+        preparedStatement.setLong(79, costhour.getWebRegisterCountTotal());
+        preparedStatement.setLong(80, costhour.getWebRegisterCountDay());
+        preparedStatement.setLong(81, costhour.getWebRegisterCountHour());
+        preparedStatement.setLong(82, costhour.getWebRegisterUvTotal());
+        preparedStatement.setLong(83, costhour.getWebRegisterUvDay());
+        preparedStatement.setLong(84, costhour.getWebRegisterUvHour());
+        preparedStatement.setDouble(85, costhour.getWebRegisterCostTotal());
+        preparedStatement.setDouble(86, costhour.getWebRegisterCostDay());
+        preparedStatement.setDouble(87, costhour.getWebRegisterCostHour());
+        preparedStatement.setString(88, costhour.getAgencyAccountId());
         preparedStatement.addBatch();
+        preparedStatement.executeBatch();
         connection.commit();
         //clickhouse 处理重复数据
         //TODO:数据去重有问题,去除掉非最新的数据
@@ -222,7 +223,6 @@ public class BatchSinkHour extends RichSinkFunction<CostHourDM> {
 //        String sql_duplicate = "optimize table data_monitoring.cost_hour final;";
 //        statement_duplicate.executeQuery(sql_duplicate);
 //        connection.commit();
-        long endTime_dp = System.currentTimeMillis();
 
     }
 

+ 13 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/dao/mapper/CostHourDMMapper.java

@@ -0,0 +1,13 @@
+package flink.zanxiangnet.ad.monitoring.dao.mapper;
+
+import flink.zanxiangnet.ad.monitoring.pojo.entity.CostHourDM;
+import org.apache.ibatis.annotations.Param;
+
+import java.util.List;
+
+public interface CostHourDMMapper {
+
+    int add(@Param("item") CostHourDM item);
+
+    int addBatch(@Param("list") List<CostHourDM> list);
+}

+ 88 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/dao/mapper/CostHourDMMapper.xml

@@ -0,0 +1,88 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+
+<mapper namespace="flink.zanxiangnet.ad.monitoring.dao.mapper.CostHourDMMapper">
+
+    <sql id="Base_Column_List">
+        dt, create_time, hour, ad_id,
+        adgroup_id, adcreative_id, account_id, campaign_id,
+        cost_total, cost_day, cost_hour, cost_diff,
+        cost_last_hour, cost_last_hour_diff, cost_last_two_hour, cost_last_two_hour_diff,
+        cost_last_three_trend, cost_speed, view_count_total, view_count_day,
+        view_count_hour, thousand_display_price_all, thousand_display_price_day, thousand_display_price_hour,
+        valid_click_count_total, valid_click_count_day, valid_click_count_hour, ctr_all,
+        ctr_day, ctr_hour, cpc_all, cpc_day,
+        cpc_hour, conversions_count_total, conversions_count_day, conversions_count_hour,
+        conversions_cost_total, conversions_cost_day, conversions_cost_hour, conversions_rate_all,
+        conversions_rate_day, conversions_rate_hour, first_day_order_roi_total, first_day_order_roi_day,
+        first_day_order_roi_hour, first_day_order_amount_total, first_day_order_amount_day, first_day_order_amount_hour,
+        first_day_order_count_total, first_day_order_count_day, first_day_order_count_hour, web_order_amount_total,
+        web_order_amount_day, web_order_amount_hour, web_order_cost_total, web_order_cost_day,
+        web_order_cost_hour, web_order_rate_total, web_order_rate_day, web_order_rate_hour,
+        web_order_count_total, web_order_count_day, web_order_count_hour, order_roi_total,
+        order_roi_day, order_roi_hour, order_unit_price_total, order_unit_price_day,
+        order_unit_price_hour, from_follow_uv_total, from_follow_uv_day, from_follow_uv_hour,
+        from_follow_cost_total, from_follow_cost_day, from_follow_cost_hour, from_follow_rate_total,
+        from_follow_rate_day, from_follow_rate_hour, web_register_count_total, web_register_count_day,
+        web_register_count_hour, web_register_uv_total, web_register_uv_day, web_register_uv_hour,
+        web_register_cost_total, web_register_cost_day, web_register_cost_hour, agency_account_id
+    </sql>
+
+    <insert id="add">
+        INSERT INTO cost_hour(<include refid="Base_Column_List"/>)
+        VALUES
+        (#{item.dt}, #{item.createTime}, #{item.hour}, #{item.adId},
+        #{item.adgroupId}, #{item.accountId}, #{item.adcreativeId}, #{item.campaignId},
+        #{item.costTotal}, #{item.costDay}, #{item.costHour}, #{item.costDiff},
+        #{item.costLastHour}, #{item.costLastHourDiff}, #{item.costLastTwoHour}, #{item.costLastTwoHourDiff},
+        #{item.costLastThreeTrend}, #{item.costSpeed}, #{item.viewCountTotal}, #{item.viewCountDay},
+        #{item.viewCountHour}, #{item.thousandDisplayPriceAll}, #{item.thousandDisplayPriceDay}, #{item.thousandDisplayPriceHour},
+        #{item.validClickCountTotal}, #{item.validClickCountDay}, #{item.validClickCountHour}, #{item.ctrAll},
+        #{item.ctrDay}, #{item.ctrHour}, #{item.cpcAll}, #{item.cpcDay},
+        #{item.cpcHour}, #{item.conversionsCountTotal}, #{item.conversionsCountDay}, #{item.conversionsCountHour},
+        #{item.conversionsCostTotal}, #{item.conversionsCostDay}, #{item.conversionsCostHour}, #{item.conversionsRateAll},
+        #{item.conversionsRateDay}, #{item.conversionsRateHour}, #{item.firstDayOrderRoiTotal}, #{item.firstDayOrderRoiDay},
+        #{item.firstDayOrderRoiHour}, #{item.firstDayOrderAmountTotal}, #{item.firstDayOrderAmountDay}, #{item.firstDayOrderAmountHour},
+        #{item.firstDayOrderCountTotal}, #{item.firstDayOrderCountDay}, #{item.firstDayOrderCountHour}, #{item.webOrderAmountTotal},
+        #{item.webOrderAmountDay}, #{item.webOrderAmountHour}, #{item.webOrderCostTotal}, #{item.webOrderCostDay},
+        #{item.webOrderCostHour}, #{item.webOrderRateTotal}, #{item.webOrderRateDay}, #{item.webOrderRateHour},
+        #{item.webOrderCountTotal}, #{item.webOrderCountDay}, #{item.webOrderCountHour}, #{item.orderRoiTotal},
+        #{item.orderRoiDay}, #{item.orderRoiHour}, #{item.orderUnitPriceTotal}, #{item.orderUnitPriceDay},
+        #{item.orderUnitPriceHour}, #{item.fromFollowUvTotal}, #{item.fromFollowUvDay}, #{item.fromFollowUvHour},
+        #{item.fromFollowCostTotal}, #{item.fromFollowCostDay}, #{item.fromFollowCostHour}, #{item.fromFollowRateTotal},
+        #{item.fromFollowRateDay}, #{item.fromFollowRateHour}, #{item.webRegisterCountTotal}, #{item.webRegisterCountDay},
+        #{item.webRegisterCountHour}, #{item.webRegisterUvTotal}, #{item.webRegisterUvDay}, #{item.webRegisterUvHour},
+        #{item.webRegisterCostTotal}, #{item.webRegisterCostDay}, #{item.webRegisterCostHour}, #{item.agencyAccountId}
+        )
+    </insert>
+
+    <insert id="addBatch">
+        INSERT INTO cost_hour(<include refid="Base_Column_List"/>)
+        VALUES
+        <foreach collection="list" index="index" item="item" separator=",">
+            (#{item.dt}, #{item.createTime}, #{item.hour}, #{item.adId},
+            #{item.adgroupId}, #{item.accountId}, #{item.adcreativeId}, #{item.campaignId},
+            #{item.costTotal}, #{item.costDay}, #{item.costHour}, #{item.costDiff},
+            #{item.costLastHour}, #{item.costLastHourDiff}, #{item.costLastTwoHour}, #{item.costLastTwoHourDiff},
+            #{item.costLastThreeTrend}, #{item.costSpeed}, #{item.viewCountTotal}, #{item.viewCountDay},
+            #{item.viewCountHour}, #{item.thousandDisplayPriceAll}, #{item.thousandDisplayPriceDay}, #{item.thousandDisplayPriceHour},
+            #{item.validClickCountTotal}, #{item.validClickCountDay}, #{item.validClickCountHour}, #{item.ctrAll},
+            #{item.ctrDay}, #{item.ctrHour}, #{item.cpcAll}, #{item.cpcDay},
+            #{item.cpcHour}, #{item.conversionsCountTotal}, #{item.conversionsCountDay}, #{item.conversionsCountHour},
+            #{item.conversionsCostTotal}, #{item.conversionsCostDay}, #{item.conversionsCostHour}, #{item.conversionsRateAll},
+            #{item.conversionsRateDay}, #{item.conversionsRateHour}, #{item.firstDayOrderRoiTotal}, #{item.firstDayOrderRoiDay},
+            #{item.firstDayOrderRoiHour}, #{item.firstDayOrderAmountTotal}, #{item.firstDayOrderAmountDay}, #{item.firstDayOrderAmountHour},
+            #{item.firstDayOrderCountTotal}, #{item.firstDayOrderCountDay}, #{item.firstDayOrderCountHour}, #{item.webOrderAmountTotal},
+            #{item.webOrderAmountDay}, #{item.webOrderAmountHour}, #{item.webOrderCostTotal}, #{item.webOrderCostDay},
+            #{item.webOrderCostHour}, #{item.webOrderRateTotal}, #{item.webOrderRateDay}, #{item.webOrderRateHour},
+            #{item.webOrderCountTotal}, #{item.webOrderCountDay}, #{item.webOrderCountHour}, #{item.orderRoiTotal},
+            #{item.orderRoiDay}, #{item.orderRoiHour}, #{item.orderUnitPriceTotal}, #{item.orderUnitPriceDay},
+            #{item.orderUnitPriceHour}, #{item.fromFollowUvTotal}, #{item.fromFollowUvDay}, #{item.fromFollowUvHour},
+            #{item.fromFollowCostTotal}, #{item.fromFollowCostDay}, #{item.fromFollowCostHour}, #{item.fromFollowRateTotal},
+            #{item.fromFollowRateDay}, #{item.fromFollowRateHour}, #{item.webRegisterCountTotal}, #{item.webRegisterCountDay},
+            #{item.webRegisterCountHour}, #{item.webRegisterUvTotal}, #{item.webRegisterUvDay}, #{item.webRegisterUvHour},
+            #{item.webRegisterCostTotal}, #{item.webRegisterCostDay}, #{item.webRegisterCostHour}, #{item.agencyAccountId}
+            )
+        </foreach>
+    </insert>
+</mapper>

+ 91 - 91
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/pojo/entity/CostHourDM.java

@@ -5,180 +5,180 @@ import lombok.Data;
 @Data
 public class CostHourDM {
     //时间-天
-    public String dt;
+    private String dt;
     //真实时间
-    public String createTime;
-    //计划 id
-    public String campaignId;
+    private String createTime;
     //时间-小时
-    public String hour;
+    private String hour;
     //广告id
-    public String adId;
-    //服务商id
-    public String agencyAccountId;
+    private String adId;
     //广告组id
-    public String adgroupId;
-    //创意id
-    public String adcreativeId;
+    private String adgroupId;
     //账号id
-    public String accountId;
+    private String accountId;
+    //创意id
+    private String adcreativeId;
+    //计划 id
+    private String campaignId;
     //总消耗
-    public long costTotal;
+    private long costTotal;
     //当天消耗
-    public long costDay;
+    private long costDay;
     //当天小时消耗
-    public long costHour;
+    private long costHour;
     //当前小时与前一小时消耗差额
-    public long costDiff;
+    private long costDiff;
     //前一小时消耗金额
-    public long costLastHour;
+    private long costLastHour;
     //前一小时与前二小时消耗差额
-    public long costLastHourDiff;
+    private long costLastHourDiff;
     //前二小时消耗金额
-    public long costLastTwoHour;
+    private long costLastTwoHour;
     //前二小时与前三小时消耗差额
-    public long costLastTwoHourDiff;
+    private long costLastTwoHourDiff;
     //前三小时消耗趋势
-    public long costLastThreeTrend;
+    private long costLastThreeTrend;
     //消耗速度
-    public double costSpeed;
+    private double costSpeed;
     //总浏览量
-    public long viewCountTotal;
+    private long viewCountTotal;
     //天-总浏览量
-    public long viewCountDay;
+    private long viewCountDay;
     //小时-总浏览量
-    public long viewCountHour;
+    private long viewCountHour;
     //总平均千次曝光成本
-    public long thousandDisplayPriceAll;
+    private long thousandDisplayPriceAll;
     //天-总平均曝光成本
-    public long thousandDisplayPriceDay;
+    private long thousandDisplayPriceDay;
     //小时-总平均曝光成本
-    public long thousandDisplayPriceHour;
+    private long thousandDisplayPriceHour;
     //总点击量
-    public long validClickCountTotal;
+    private long validClickCountTotal;
     //天-总点击量
-    public long validClickCountDay;
+    private long validClickCountDay;
     //小时-总点击量
-    public long validClickCountHour;
+    private long validClickCountHour;
     //总平均点击率
-    public double ctrAll;
+    private double ctrAll;
     //天-总平均点击率
-    public double ctrDay;
+    private double ctrDay;
     //小时-总平均点击率
-    public double ctrHour;
+    private double ctrHour;
     //总点击均价
-    public long cpcAll;
+    private long cpcAll;
     //天-总点击均价
-    public long cpcDay;
+    private long cpcDay;
     //小时-总点击均价
-    public long cpcHour;
+    private long cpcHour;
     //总目标转化量
-    public long conversionsCountTotal;
+    private long conversionsCountTotal;
     //天-总目标转化量
-    public long conversionsCountDay;
+    private long conversionsCountDay;
     //小时-总目标转化量
-    public long conversionsCountHour;
+    private long conversionsCountHour;
     //总目标平均转化成本
-    public long conversionsCostTotal;
+    private long conversionsCostTotal;
     //天-总目标平均转化成本
-    public long conversionsCostDay;
+    private long conversionsCostDay;
     //小时-总目标平均转化成本
-    public long conversionsCostHour;
+    private long conversionsCostHour;
     //总平均转化率
-    public double conversionsRateAll;
+    private double conversionsRateAll;
     //天-总平均转化率
-    public double conversionsRateDay;
+    private double conversionsRateDay;
     //小时-总平均转化率
-    public double conversionsRateHour;
+    private double conversionsRateHour;
     //总首日下单roi
-    public double firstDayOrderRoiTotal;
+    private double firstDayOrderRoiTotal;
     //天-总首日下单roi
-    public double firstDayOrderRoiDay;
+    private double firstDayOrderRoiDay;
     //小时-总首日下单roi
-    public double firstDayOrderRoiHour;
+    private double firstDayOrderRoiHour;
     //总首日下单金额
-    public long firstDayOrderAmountTotal;
+    private long firstDayOrderAmountTotal;
     //天-总首日下单金额
-    public long firstDayOrderAmountDay;
+    private long firstDayOrderAmountDay;
     //小时-总首日下单金额
-    public long firstDayOrderAmountHour;
+    private long firstDayOrderAmountHour;
     //总首日下单量
-    public long firstDayOrderCountTotal;
+    private long firstDayOrderCountTotal;
     //天-总首日下单量
-    public long firstDayOrderCountDay;
+    private long firstDayOrderCountDay;
     //小时-总首日下单量
-    public long firstDayOrderCountHour;
+    private long firstDayOrderCountHour;
     //总下单金额
-    public long webOrderAmountTotal;
+    private long webOrderAmountTotal;
     //天-总下单金额
-    public long webOrderAmountDay;
+    private long webOrderAmountDay;
     //小时-总下单金额
-    public long webOrderAmountHour;
+    private long webOrderAmountHour;
     //总平均下单成本
-    public long webOrderCostTotal;
+    private long webOrderCostTotal;
     //天-总平均下单成本
-    public long webOrderCostDay;
+    private long webOrderCostDay;
     //小时-总平均下单成本
-    public long webOrderCostHour;
+    private long webOrderCostHour;
     //总平均下单率
-    public double webOrderRateTotal;
+    private double webOrderRateTotal;
     //天-总平均下单率
-    public double webOrderRateDay;
+    private double webOrderRateDay;
     //小时-总平均下单率
-    public double webOrderRateHour;
+    private double webOrderRateHour;
     //总平均下单量
-    public long webOrderCountTotal;
+    private long webOrderCountTotal;
     //天-总平均下单量
-    public long webOrderCountDay;
+    private long webOrderCountDay;
     //小时-总平均下单量
-    public long webOrderCountHour;
+    private long webOrderCountHour;
     //总下单ROI
-    public double orderRoiTotal;
+    private double orderRoiTotal;
     //天-总下单roi
-    public double orderRoiDay;
+    private double orderRoiDay;
     //小时-总下单roi
-    public double orderRoiHour;
+    private double orderRoiHour;
     //总平均下单客单价
-    public long orderUnitPriceTotal;
+    private long orderUnitPriceTotal;
     //天-总平均下单客单价
-    public long orderUnitPriceDay;
+    private long orderUnitPriceDay;
     //小时-总平均下单客单价
-    public long orderUnitPriceHour;
+    private long orderUnitPriceHour;
     //总公众号关注量
-    public long fromFollowUvTotal;
+    private long fromFollowUvTotal;
     //天-总公众号关注量
-    public long fromFollowUvDay;
+    private long fromFollowUvDay;
     //小时-总公众号关注量
-    public long fromFollowUvHour;
+    private long fromFollowUvHour;
     //总平均公众号关注成本
-    public long fromFollowCostTotal;
+    private long fromFollowCostTotal;
     //天-总平均公众号关注成本
-    public long fromFollowCostDay;
+    private long fromFollowCostDay;
     //小时-总平均公众号关注成本
-    public long fromFollowCostHour;
+    private long fromFollowCostHour;
     //总平均公众号关注率
-    public double fromFollowRateTotal;
+    private double fromFollowRateTotal;
     //天-总平均公众号关注率
-    public double fromFollowRateDay;
+    private double fromFollowRateDay;
     //小时-总平均公众号关注率
-    public double fromFollowRateHour;
+    private double fromFollowRateHour;
     //总注册数
-    public long webRegisterCountTotal;
+    private long webRegisterCountTotal;
     //天-总注册数
-    public long webRegisterCountDay;
+    private long webRegisterCountDay;
     //小时-总注册数
-    public long webRegisterCountHour;
+    private long webRegisterCountHour;
     //总注册人数
-    public long webRegisterUvTotal;
+    private long webRegisterUvTotal;
     //天-总注册人数
-    public long webRegisterUvDay;
+    private long webRegisterUvDay;
     //小时-总注册人数
-    public long webRegisterUvHour;
+    private long webRegisterUvHour;
     //总平均注册成本
-    public double webRegisterCostTotal;
+    private double webRegisterCostTotal;
     //天-总平均注册成本
-    public double webRegisterCostDay;
+    private double webRegisterCostDay;
     //小时-总平均注册成本
-    public double webRegisterCostHour;
+    private double webRegisterCostHour;
+    //服务商id
+    private String agencyAccountId;
 
 }

+ 111 - 112
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostHourDayProcess.java

@@ -34,172 +34,171 @@ public class CostHourDayProcess extends ProcessWindowFunction<AdStatOfDayDWD, Co
     //数据格式转换
     public CostHourDM datachange(AdStatOfDayDWD adStatOfMinuteDWD, CostHourDM costHourDM) {
         //时间-天
-        costHourDM.dt = adStatOfMinuteDWD.getStatDay();
+        costHourDM.setDt(adStatOfMinuteDWD.getStatDay());
         //计划 id
-        costHourDM.campaignId = adStatOfMinuteDWD.getCampaignId().toString();
+        costHourDM.setCampaignId(adStatOfMinuteDWD.getCampaignId().toString());
         //时间- real
-        costHourDM.createTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(adStatOfMinuteDWD.getCreateTime());
+        costHourDM.setCreateTime(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(adStatOfMinuteDWD.getCreateTime()));
         //时间-小时
         //TODO:之后需要进一步修改
-        costHourDM.hour = adStatOfMinuteDWD.getStatDay() + " 23:00:00";
+        costHourDM.setHour(adStatOfMinuteDWD.getStatDay() + " 23:00:00");
         //广告id
-        costHourDM.adId = adStatOfMinuteDWD.getAdId().toString();
+        costHourDM.setAdId(adStatOfMinuteDWD.getAdId().toString());
         //广告组id
-        costHourDM.adgroupId = adStatOfMinuteDWD.getAdgroupId().toString();
+        costHourDM.setAdgroupId(adStatOfMinuteDWD.getAdgroupId().toString());;
         //创意id
-        costHourDM.adcreativeId = "";
+        costHourDM.setAdcreativeId("");;
         //账号id
-        costHourDM.accountId = adStatOfMinuteDWD.getAccountId().toString();
+        costHourDM.setAccountId(adStatOfMinuteDWD.getAccountId().toString());
         //总消耗
-        costHourDM.costTotal = adStatOfMinuteDWD.getCostTotal();
+        costHourDM.setCostTotal(adStatOfMinuteDWD.getCostTotal());
         //当天消耗
-        costHourDM.costDay = adStatOfMinuteDWD.getCostDay();
+        costHourDM.setCostDay(adStatOfMinuteDWD.getCostDay());
         //当天小时消耗
         //TODO:数据--小时会缺失,下面的小时数据同理
-        costHourDM.costHour = 0;
-
+        costHourDM.setCostHour(0);
         //消耗速度
         //TODO:如果这个回滚发生在24点有问题,会覆盖掉24点的小时速度
-        costHourDM.costSpeed = 0;
-
+        costHourDM.setCostSpeed(0);
         //总浏览量
-        costHourDM.viewCountTotal = adStatOfMinuteDWD.getViewCountTotal();
+        costHourDM.setViewCountTotal(adStatOfMinuteDWD.getViewCountTotal());
         //天-总浏览量
-        costHourDM.viewCountDay = adStatOfMinuteDWD.getViewCountDay();
+        costHourDM.setViewCountDay(adStatOfMinuteDWD.getViewCountDay());
         //小时-总浏览量
-        costHourDM.viewCountHour = 0;
+        costHourDM.setViewCountHour(0);
         //总平均千次曝光成本
-        costHourDM.thousandDisplayPriceAll = adStatOfMinuteDWD.getThousandDisplayPriceAll();
+        costHourDM.setThousandDisplayPriceAll(adStatOfMinuteDWD.getThousandDisplayPriceAll());
         //天-总平均曝光成本
-        costHourDM.thousandDisplayPriceDay = adStatOfMinuteDWD.getThousandDisplayPriceDay();
+        costHourDM.setThousandDisplayPriceDay(adStatOfMinuteDWD.getThousandDisplayPriceDay());
         //小时-总平均曝光成本
-        costHourDM.thousandDisplayPriceHour = 0;
+        costHourDM.setThousandDisplayPriceHour(0);
         //总点击量
-        costHourDM.validClickCountTotal = adStatOfMinuteDWD.getValidClickCountTotal();
+        costHourDM.setValidClickCountTotal(adStatOfMinuteDWD.getValidClickCountTotal());
         //天-总点击量
-        costHourDM.validClickCountDay = adStatOfMinuteDWD.getValidClickCountDay();
+        costHourDM.setValidClickCountDay(adStatOfMinuteDWD.getValidClickCountDay());
         //小时-总点击量
-        costHourDM.validClickCountHour = 0;
+        costHourDM.setValidClickCountHour(0);
         //总平均点击率
-        costHourDM.ctrAll = adStatOfMinuteDWD.getCtrAll();
+        costHourDM.setCtrAll(adStatOfMinuteDWD.getCtrAll());
         //天-总平均点击率
-        costHourDM.ctrDay = adStatOfMinuteDWD.getCtrDay();
+        costHourDM.setCtrDay(adStatOfMinuteDWD.getCtrDay());
         //小时-总平均点击率
-        costHourDM.ctrHour = 0;
+        costHourDM.setCtrHour(0);
         //总点击均价
-        costHourDM.cpcAll = adStatOfMinuteDWD.getCpcAll();
+        costHourDM.setCpcAll(adStatOfMinuteDWD.getCpcAll());
         //天-总点击均价
-        costHourDM.cpcDay = adStatOfMinuteDWD.getCpcDay();
+        costHourDM.setCpcDay(adStatOfMinuteDWD.getCpcDay());
         //小时-总点击均价
-        costHourDM.cpcHour = 0;
+        costHourDM.setCpcHour(0);
         //总目标转化量
-        costHourDM.conversionsCountTotal = adStatOfMinuteDWD.getConversionsCountTotal();
+        costHourDM.setConversionsCountTotal(adStatOfMinuteDWD.getConversionsCountTotal());
         //天-总目标转化量
-        costHourDM.conversionsCountDay = adStatOfMinuteDWD.getConversionsCountDay();
+        costHourDM.setConversionsCountDay(adStatOfMinuteDWD.getConversionsCountDay());
         //小时-总目标转化量
-        costHourDM.conversionsCountHour = 0;
+        costHourDM.setConversionsCountHour(0);
         //总目标平均转化成本
-        costHourDM.conversionsCostTotal = adStatOfMinuteDWD.getConversionsCostAll();
+        costHourDM.setConversionsCostTotal(adStatOfMinuteDWD.getConversionsCostAll());
         //天-总目标平均转化成本
-        costHourDM.conversionsCostDay = adStatOfMinuteDWD.getConversionsCostDay();
+        costHourDM.setConversionsCostDay(adStatOfMinuteDWD.getConversionsCostDay());
         //小时-总目标平均转化成本
-        costHourDM.conversionsCostHour = 0;
+        costHourDM.setConversionsCostHour(0);
         //总平均转化率
-        costHourDM.conversionsRateAll = adStatOfMinuteDWD.getConversionsRateAll();
+        costHourDM.setConversionsRateAll(adStatOfMinuteDWD.getConversionsRateAll());
         //天-总平均转化率
-        costHourDM.conversionsRateDay = adStatOfMinuteDWD.getConversionsRateDay();
+        costHourDM.setConversionsRateDay(adStatOfMinuteDWD.getConversionsRateDay());
         //小时-总平均转化率
-        costHourDM.conversionsRateHour = 0;
+        costHourDM.setConversionsRateHour(0);
         //TODO:总首日下单roi
-        costHourDM.firstDayOrderRoiTotal = 0;
+        costHourDM.setFirstDayOrderRoiTotal(0);
         //天-总首日下单roi
-        costHourDM.firstDayOrderRoiDay = 0;
+        costHourDM.setFirstDayOrderRoiDay(0);
         //小时-总首日下单roi
-        costHourDM.firstDayOrderRoiHour = 0;
+        costHourDM.setFirstDayOrderRoiHour(0);
         //总首日下单金额
-        costHourDM.firstDayOrderAmountTotal = adStatOfMinuteDWD.getFirstDayOrderAmountTotal();
+        costHourDM.setFirstDayOrderAmountTotal(adStatOfMinuteDWD.getFirstDayOrderAmountTotal());
         //天-总首日下单金额
-        costHourDM.firstDayOrderAmountDay = adStatOfMinuteDWD.getFirstDayOrderAmountDay();
+        costHourDM.setFirstDayOrderAmountDay(adStatOfMinuteDWD.getFirstDayOrderAmountDay());
         //小时-总首日下单金额
-        costHourDM.firstDayOrderAmountHour = 0;
+        costHourDM.setFirstDayOrderAmountHour(0);
         //总首日下单量
-        costHourDM.firstDayOrderCountTotal = adStatOfMinuteDWD.getFirstDayOrderCountTotal();
+        costHourDM.setFirstDayOrderCountTotal(adStatOfMinuteDWD.getFirstDayOrderCountTotal());
         //天-总首日下单量
-        costHourDM.firstDayOrderCountDay = adStatOfMinuteDWD.getFirstDayOrderCountDay();
+        costHourDM.setFirstDayOrderCountDay(adStatOfMinuteDWD.getFirstDayOrderCountDay());
         //小时-总首日下单量
-        costHourDM.firstDayOrderCountHour = 0;
+        costHourDM.setFirstDayOrderCountHour(0);
         //总下单金额
-        costHourDM.webOrderAmountTotal = adStatOfMinuteDWD.getOrderAmountTotal();
+        costHourDM.setWebOrderAmountTotal(adStatOfMinuteDWD.getOrderAmountTotal());
         //天-总下单金额
-        costHourDM.webOrderAmountDay = adStatOfMinuteDWD.getOrderAmountDay();
+        costHourDM.setWebOrderAmountDay(adStatOfMinuteDWD.getOrderAmountDay());
         //小时-总下单金额
-        costHourDM.webOrderAmountHour = 0;
+        costHourDM.setWebOrderAmountHour(0);
         //总平均下单成本
-        costHourDM.webOrderCostTotal = adStatOfMinuteDWD.getWebOrderCostAll();
+        costHourDM.setWebOrderCostTotal(adStatOfMinuteDWD.getWebOrderCostAll());
         //天-总平均下单成本
-        costHourDM.webOrderCostDay = adStatOfMinuteDWD.getWebOrderCostDay();
+        costHourDM.setWebOrderCostDay(adStatOfMinuteDWD.getWebOrderCostDay());
         //小时-总平均下单成本
-        costHourDM.webOrderCostHour = 0;
+        costHourDM.setWebOrderCostHour(0);
         //总平均下单率
-        costHourDM.webOrderRateTotal = adStatOfMinuteDWD.getOrderRateAll();
+        costHourDM.setWebOrderRateTotal(adStatOfMinuteDWD.getOrderRateAll());
         //天-总平均下单率
-        costHourDM.webOrderRateDay = adStatOfMinuteDWD.getOrderRateDay();
+        costHourDM.setWebOrderRateDay(adStatOfMinuteDWD.getOrderRateDay());
         //小时-总平均下单率
-        costHourDM.webOrderRateHour = 0;
+        costHourDM.setWebOrderRateHour(0);
         //TODO:总平均下单量-----webordercount和ordercount是同一个东西吗
-        costHourDM.webOrderCountTotal = adStatOfMinuteDWD.getOrderCountTotal();
+        costHourDM.setWebOrderCountTotal(adStatOfMinuteDWD.getOrderCountTotal());
         //天-总平均下单量
-        costHourDM.webOrderCountDay = adStatOfMinuteDWD.getOrderCountDay();
+        costHourDM.setWebOrderCountDay(adStatOfMinuteDWD.getOrderCountDay());
         //小时-总平均下单量
-        costHourDM.webOrderCountHour = 0;
+        costHourDM.setWebOrderCountHour(0);
         //总下单ROI
-        costHourDM.orderRoiTotal = adStatOfMinuteDWD.getOrderRoiAll();
+        costHourDM.setOrderRoiTotal(adStatOfMinuteDWD.getOrderRoiAll());
         //天-总下单roi
-        costHourDM.orderRoiDay = adStatOfMinuteDWD.getOrderRoiDay();
+        costHourDM.setOrderRoiDay(adStatOfMinuteDWD.getOrderRoiDay());
         //小时-总下单roi
-        costHourDM.orderRoiHour = 0;
+        costHourDM.setOrderRoiHour(0);
         //总平均下单客单价
-        costHourDM.orderUnitPriceTotal = adStatOfMinuteDWD.getOrderUnitPriceAll();
+        costHourDM.setOrderUnitPriceTotal(adStatOfMinuteDWD.getOrderUnitPriceAll());
         //天-总平均下单客单价
-        costHourDM.orderUnitPriceDay = adStatOfMinuteDWD.getOrderUnitPriceDay();
+        costHourDM.setOrderUnitPriceDay(adStatOfMinuteDWD.getOrderUnitPriceDay());
         //小时-总平均下单客单价
-        costHourDM.orderUnitPriceHour = 0;
+        costHourDM.setOrderUnitPriceHour(0);
         //总公众号关注量
-        costHourDM.fromFollowUvTotal = adStatOfMinuteDWD.getOfficialAccountFollowCountTotal();
+        costHourDM.setFromFollowUvTotal(adStatOfMinuteDWD.getOfficialAccountFollowCountTotal());
         //天-总公众号关注量
-        costHourDM.fromFollowUvDay = adStatOfMinuteDWD.getOfficialAccountFollowCountDay();
+        costHourDM.setFromFollowUvDay(adStatOfMinuteDWD.getOfficialAccountFollowCountDay());
         //小时-总公众号关注量
-        costHourDM.fromFollowUvHour = 0;
+        costHourDM.setFromFollowUvHour(0);
         //TODO:总平均公众号关注成本---是否是价格/关注
-        costHourDM.fromFollowCostTotal = adStatOfMinuteDWD.getOfficialAccountFollowCountTotal() == 0 ? 0 : adStatOfMinuteDWD.getCostTotal() / adStatOfMinuteDWD.getOfficialAccountFollowCountTotal();
+        costHourDM.setFromFollowCostTotal(adStatOfMinuteDWD.getOfficialAccountFollowCountTotal() == 0 ? 0 : adStatOfMinuteDWD.getCostTotal() / adStatOfMinuteDWD.getOfficialAccountFollowCountTotal());
         //天-总平均公众号关注成本
-        costHourDM.fromFollowCostDay = adStatOfMinuteDWD.getOfficialAccountFollowCountDay() == 0 ? 0 : adStatOfMinuteDWD.getCostDay() / adStatOfMinuteDWD.getOfficialAccountFollowCountDay();
+        costHourDM.setFromFollowCostDay(adStatOfMinuteDWD.getOfficialAccountFollowCountDay() == 0 ? 0 : adStatOfMinuteDWD.getCostDay() / adStatOfMinuteDWD.getOfficialAccountFollowCountDay());
         //小时-总平均公众号关注成本
-        costHourDM.fromFollowCostHour = 0;
+        costHourDM.setFromFollowCostHour(0);
         //TODO:总平均公众号关注率----确认是否对应
-        costHourDM.fromFollowRateTotal = adStatOfMinuteDWD.getOfficialAccountFollowRateAll();
+        costHourDM.setFromFollowRateTotal(adStatOfMinuteDWD.getOfficialAccountFollowRateAll());
         //天-总平均公众号关注率
-        costHourDM.fromFollowRateDay = adStatOfMinuteDWD.getOfficialAccountFollowRateDay();
+        costHourDM.setFromFollowRateDay(adStatOfMinuteDWD.getOfficialAccountFollowRateDay());
         //小时-总平均公众号关注率
-        costHourDM.fromFollowRateHour = 0;
+        costHourDM.setFromFollowRateHour(0);
         //TODO:总注册数-----下面全是有问题的
-        costHourDM.webRegisterCountTotal = adStatOfMinuteDWD.getOfficialAccountRegisterUserCountTotal();
+        costHourDM.setWebRegisterCountTotal(adStatOfMinuteDWD.getOfficialAccountRegisterUserCountTotal());
         //天-总注册数
-        costHourDM.webRegisterCountDay = adStatOfMinuteDWD.getOfficialAccountRegisterUserCountDay();
+        costHourDM.setWebRegisterCountDay(adStatOfMinuteDWD.getOfficialAccountRegisterUserCountDay());
         //小时-总注册数
-        costHourDM.webRegisterCountHour = 0;
+        costHourDM.setWebRegisterCountHour(0);
         //总注册人数
-        costHourDM.webRegisterUvTotal = adStatOfMinuteDWD.getOfficialAccountRegisterUserCountTotal();
+        costHourDM.setWebRegisterUvTotal(adStatOfMinuteDWD.getOfficialAccountRegisterUserCountTotal());
         //天-总注册人数
-        costHourDM.webRegisterUvDay = adStatOfMinuteDWD.getOfficialAccountRegisterUserCountDay();
+        costHourDM.setWebRegisterUvDay(adStatOfMinuteDWD.getOfficialAccountRegisterUserCountDay());
         //小时-总注册人数
-        costHourDM.webRegisterUvHour = 0;
+        costHourDM.setWebRegisterUvHour(0);
         //总平均注册成本
-        costHourDM.webRegisterCostTotal = adStatOfMinuteDWD.getOfficialAccountRegisterCostAll();
+        costHourDM.setWebRegisterCostTotal(adStatOfMinuteDWD.getOfficialAccountRegisterCostAll());
         //天-总平均注册成本
-        costHourDM.webRegisterCostDay = adStatOfMinuteDWD.getOfficialAccountRegisterCostDay();
+        costHourDM.setWebRegisterCostDay(adStatOfMinuteDWD.getOfficialAccountRegisterCostDay());
         //小时-总平均注册成本
-        costHourDM.webRegisterCostHour = 0;
+        costHourDM.setWebRegisterCostHour(0);
+
         return costHourDM;
     }
 
@@ -255,35 +254,35 @@ public class CostHourDayProcess extends ProcessWindowFunction<AdStatOfDayDWD, Co
             Statement statement = connection.createStatement();
             ResultSet rs = statement.executeQuery(sql);
             while (rs.next()) {
-                costHourDM.costSpeed = rs.getDouble(1);
-                costHourDM.costHour = rs.getLong(2);
-                costHourDM.viewCountHour = rs.getLong(3);
-                costHourDM.thousandDisplayPriceHour = rs.getLong(4);
-                costHourDM.validClickCountHour = rs.getLong(5);
-                costHourDM.ctrHour = rs.getDouble(6);
-                costHourDM.cpcHour = rs.getLong(7);
-                costHourDM.conversionsCountHour = rs.getLong(8);
-                costHourDM.conversionsCostHour = rs.getLong(9);
-                costHourDM.conversionsRateHour = rs.getDouble(10);
-                costHourDM.firstDayOrderRoiHour = rs.getDouble(11);
-                costHourDM.firstDayOrderAmountHour = rs.getLong(12);
-                costHourDM.firstDayOrderCountHour = rs.getLong(13);
-                costHourDM.webOrderAmountHour = rs.getLong(14);
-                costHourDM.webOrderCostHour = rs.getLong(15);
-                costHourDM.webOrderRateHour = rs.getDouble(16);
-                costHourDM.webOrderCountHour = rs.getLong(17);
-                costHourDM.orderRoiHour = rs.getDouble(18);
-                costHourDM.orderUnitPriceHour = rs.getLong(19);
-                costHourDM.fromFollowUvHour = rs.getLong(20);
-                costHourDM.fromFollowCostHour = rs.getLong(21);
-                costHourDM.fromFollowRateHour = rs.getDouble(22);
-                costHourDM.webRegisterCountHour = rs.getLong(23);
-                costHourDM.webRegisterUvHour = rs.getLong(24);
-                costHourDM.webRegisterCostHour = rs.getDouble(25);
-                costHourDM.costLastHour = rs.getLong(26);
-                costHourDM.costLastTwoHour = rs.getLong(27);
-                costHourDM.costLastHourDiff = rs.getLong(28);
-                costHourDM.costLastThreeTrend = rs.getLong(29);
+                costHourDM.setCostSpeed(rs.getDouble(1));
+                costHourDM.setCostHour(rs.getLong(2));
+                costHourDM.setViewCountHour(rs.getLong(3));
+                costHourDM.setThousandDisplayPriceHour(rs.getLong(4));
+                costHourDM.setValidClickCountHour(rs.getLong(5));
+                costHourDM.setCtrHour(rs.getDouble(6));
+                costHourDM.setCpcHour(rs.getLong(7));
+                costHourDM.setConversionsCountHour(rs.getLong(8));
+                costHourDM.setConversionsCostHour(rs.getLong(9));
+                costHourDM.setConversionsRateHour(rs.getDouble(10));
+                costHourDM.setFirstDayOrderRoiHour(rs.getDouble(11));
+                costHourDM.setFirstDayOrderAmountHour(rs.getLong(12));
+                costHourDM.setFirstDayOrderCountHour(rs.getLong(13));
+                costHourDM.setWebOrderAmountHour(rs.getLong(14));
+                costHourDM.setWebOrderCostHour(rs.getLong(15));
+                costHourDM.setWebOrderRateHour(rs.getDouble(16));
+                costHourDM.setWebOrderCountHour(rs.getLong(17));
+                costHourDM.setOrderRoiHour(rs.getDouble(18));
+                costHourDM.setOrderUnitPriceHour(rs.getLong(19));
+                costHourDM.setFromFollowUvHour(rs.getLong(20));
+                costHourDM.setFromFollowCostHour(rs.getLong(21));
+                costHourDM.setFromFollowRateHour(rs.getDouble(22));
+                costHourDM.setWebRegisterCountHour(rs.getLong(23));
+                costHourDM.setWebRegisterUvHour(rs.getLong(24));
+                costHourDM.setWebRegisterCostHour(rs.getDouble(25));
+                costHourDM.setCostLastHour(rs.getLong(26));
+                costHourDM.setCostLastTwoHour(rs.getLong(27));
+                costHourDM.setCostLastHourDiff(rs.getLong(28));
+                costHourDM.setCostLastThreeTrend(rs.getLong(29));
             }
             CostHourDM costHourDM_new = datachange(adStatOfMinuteDWD, costHourDM);
 

+ 1 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/sink/AdDWDToCkBatchSink.java → flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/sink/AdDayDWDToCkBatchSink.java

@@ -23,7 +23,7 @@ import java.util.*;
  * 批量数据写出
  */
 @Slf4j
-public class AdDWDToCkBatchSink extends RichSinkFunction<List<AdStatOfDayDWD>> {
+public class AdDayDWDToCkBatchSink extends RichSinkFunction<List<AdStatOfDayDWD>> {
 
     private SqlSessionFactory sqlSessionFactory;
 

+ 77 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/sink/AdHourDMToCkBatchSink.java

@@ -0,0 +1,77 @@
+package flink.zanxiangnet.ad.monitoring.sink;
+
+import com.aliyun.odps.tunnel.TunnelException;
+import flink.zanxiangnet.ad.monitoring.config.ClickhouseDataSourceFactory;
+import flink.zanxiangnet.ad.monitoring.dao.mapper.AdStatOfDayDWDMapper;
+import flink.zanxiangnet.ad.monitoring.dao.mapper.CostHourDMMapper;
+import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfDayDWD;
+import flink.zanxiangnet.ad.monitoring.pojo.entity.CostHourDM;
+import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.ibatis.datasource.DataSourceFactory;
+import org.apache.ibatis.mapping.Environment;
+import org.apache.ibatis.session.SqlSession;
+import org.apache.ibatis.session.SqlSessionFactory;
+import org.apache.ibatis.session.SqlSessionFactoryBuilder;
+import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * 批量数据写出
+ */
+@Slf4j
+public class AdHourDMToCkBatchSink extends RichSinkFunction<List<CostHourDM>> {
+
+    private SqlSessionFactory sqlSessionFactory;
+
+
+    @Override
+    public void open(Configuration config) throws Exception {
+        Map<String, String> params = getRuntimeContext()
+                .getExecutionConfig()
+                .getGlobalJobParameters()
+                .toMap();
+
+        Properties ckProps = new Properties();
+        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_URL, params.get(ApplicationProperties.CK_URL));
+        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_USER, params.get(ApplicationProperties.CK_USERNAME));
+        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_PASSWORD, params.get(ApplicationProperties.CK_PASSWORD));
+
+        DataSourceFactory dataSourceFactory = new ClickhouseDataSourceFactory();
+        dataSourceFactory.setProperties(ckProps);
+        Environment environment = new Environment("clickhouse", new JdbcTransactionFactory(), dataSourceFactory.getDataSource());
+        org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration(environment);
+        // 开启驼峰规则
+        configuration.setMapUnderscoreToCamelCase(true);
+        configuration.getTypeAliasRegistry().registerAlias(CostHourDM.class);
+        // addMapper一定要放到 alias的后面!!!!!
+        configuration.addMapper(CostHourDMMapper.class);
+        sqlSessionFactory = new SqlSessionFactoryBuilder().build(configuration);
+    }
+
+    /**
+     * 将值写入到 Sink。每个值都会调用此函数
+     *
+     * @param value
+     * @param context
+     */
+    @Override
+    public void invoke(List<CostHourDM> value, Context context) throws TunnelException, IOException, InvocationTargetException, IllegalAccessException {
+        try (SqlSession session = sqlSessionFactory.openSession()) {
+            CostHourDMMapper mapper = session.getMapper(CostHourDMMapper.class);
+            mapper.addBatch(value);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+    }
+}

+ 1 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/sink/PlanDWDToCkBatchSink.java → flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/sink/PlanDayDWDToCkBatchSink.java

@@ -25,7 +25,7 @@ import java.util.Properties;
  * 批量数据写出
  */
 @Slf4j
-public class PlanDWDToCkBatchSink extends RichSinkFunction<List<PlanStatOfDayDWD>> {
+public class PlanDayDWDToCkBatchSink extends RichSinkFunction<List<PlanStatOfDayDWD>> {
 
     private SqlSessionFactory sqlSessionFactory;