فهرست منبع

天数据实时提交

wcc 3 سال پیش
والد
کامیت
43a47e1e93

+ 12 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdHourStreamJob.java

@@ -2,6 +2,7 @@ package flink.zanxiangnet.ad.monitoring;
 
 import flink.zanxiangnet.ad.monitoring.pojo.entity.*;
 import flink.zanxiangnet.ad.monitoring.process.*;
+import flink.zanxiangnet.ad.monitoring.sink.AdDayDWDToCkBatchSink;
 import flink.zanxiangnet.ad.monitoring.sink.AdHourDMToCkBatchSink;
 import flink.zanxiangnet.ad.monitoring.sink.AdMinuteDMToCkBatchSink;
 import flink.zanxiangnet.ad.monitoring.sink.TunnelBatchStreamSink;
@@ -112,7 +113,7 @@ public class AdHourStreamJob {
                 .addSink(new TunnelBatchStreamSink<>(AdStatOfHourDWD.class))
                 .name("sink_ad_hour_dwd");
 
-        // 小时流-填充 0值
+        // 分钟流转小时流同时填充空白的小时
         SingleOutputStreamOperator<AdStatOfHourDWD> hourStreamFromMinute = adMinuteDWDStream.map(AdStatOfHourDWD::byMinuteDWD)
                 .keyBy(AdStatOfHourDWD::getAdId)
                 .process(new AdHourOnTimeStreamCompletionProcess());
@@ -128,6 +129,16 @@ public class AdHourStreamJob {
                 .addSink(new AdHourDMToCkBatchSink())
                 .name("sink_ad_hour_dm_clickhouse");
 
+        // 小时流转天流同时填充空白天数据
+        SingleOutputStreamOperator<AdStatOfDayDWD> dayStreamFromHour = hourStreamFromMinute.map(AdStatOfDayDWD::byHourDWD)
+                .keyBy(AdStatOfDayDWD::getAdId)
+                .process(new AdDayOnTimeStreamCompletionProcess());
+        // 写入 ck
+        new BatchStream<>("adDayDWDToCkStream", dayStreamFromHour, 500L, 60 * 1000L)
+                .toBatch()
+                .addSink(new AdDayDWDToCkBatchSink())
+                .name("ad_day_dwd_from_hour_sink");
+
         env.execute("ad_hour_stream_job");
     }
 }

+ 1 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/dao/mapper/CostHourDMMapper.xml

@@ -32,7 +32,7 @@
         INSERT INTO cost_hour(<include refid="Base_Column_List"/>)
         VALUES
         (#{item.dt}, #{item.createTime}, #{item.hour}, #{item.adId},
-        #{item.adgroupId}, #{item.accountId}, #{item.adcreativeId}, #{item.campaignId},
+        #{item.adgroupId}, #{item.adcreativeId}, #{item.accountId}, #{item.campaignId},
         #{item.costTotal}, #{item.costDay}, #{item.costHour}, #{item.costDiff},
         #{item.costLastHour}, #{item.costLastHourDiff}, #{item.costLastTwoHour}, #{item.costLastTwoHourDiff},
         #{item.costLastThreeHour},#{item.costLastThreeTrend}, #{item.costSpeed}, #{item.viewCountTotal}, #{item.viewCountDay},

+ 28 - 28
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/dao/mapper/CostMinuterDMMapper.xml

@@ -23,40 +23,40 @@
     <insert id="add">
         INSERT INTO cost_minute(<include refid="Base_Column_List"/>)
         VALUES
-        (#{item.dt}, #{item.createTime}, #{item.campaignId},
-        #{item.hour}, #{item.minute}, #{item.adId},
-        #{item.agencyAccountId}, #{item.adgroupId}, #{item.adcreativeId},
-        #{item.accountId}, #{item.costMinute}, #{item.costDiff},
-        #{item.costLastHour}, #{item.costLastHourDiff}, #{item.costLastTwoHour},
-        #{item.costLastTwoHourDiff}, #{item.costLastThreeTrend}, #{item.costSpeed},
-        #{item.viewCountMinute}, #{item.thousandDisplayPriceMinute}, #{item.validClickCountMinute},
-        #{item.ctrMinute}, #{item.cpcMinute}, #{item.conversionsCountMinute},
-        #{item.conversionsCostMinute}, #{item.conversionsRateMinute}, #{item.firstDayOrderRoiMinute},
-        #{item.firstDayOrderAmountMinute}, #{item.firstDayOrderCountMinute}, #{item.webOrderAmountMinute},
-        #{item.webOrderCostMinute}, #{item.webOrderRateMinute}, #{item.webOrderCountMinute},
-        #{item.orderRoiMinute}, #{item.orderUnitPriceMinute}, #{item.fromFollowUvMinute},
-        #{item.fromFollowCostMinute}, #{item.fromFollowRateMinute}, #{item.webRegisterCountMinute},
-        #{item.webRegisterUvMinute}, #{item.webRegisterCostMinute})
+        (#{item.dt}, #{item.minute}, #{item.createTime},
+        #{item.hour}, #{item.adId}, #{item.adgroupId},
+        #{item.adcreativeId}, #{item.accountId}, #{item.campaignId},
+        #{item.costMinute}, #{item.costDiff}, #{item.costLastHour},
+        #{item.costLastHourDiff}, #{item.costLastTwoHour}, #{item.costLastTwoHourDiff},
+        #{item.costLastThreeTrend}, #{item.costSpeed}, #{item.viewCountMinute},
+        #{item.thousandDisplayPriceMinute}, #{item.validClickCountMinute}, #{item.ctrMinute},
+        #{item.cpcMinute}, #{item.conversionsCountMinute}, #{item.conversionsCostMinute},
+        #{item.conversionsRateMinute}, #{item.firstDayOrderRoiMinute}, #{item.firstDayOrderAmountMinute},
+        #{item.firstDayOrderCountMinute}, #{item.webOrderAmountMinute}, #{item.webOrderCostMinute},
+        #{item.webOrderRateMinute}, #{item.webOrderCountMinute}, #{item.orderRoiMinute},
+        #{item.orderUnitPriceMinute}, #{item.fromFollowUvMinute}, #{item.fromFollowCostMinute},
+        #{item.fromFollowRateMinute}, #{item.webRegisterCountMinute}, #{item.webRegisterUvMinute},
+        #{item.webRegisterCostMinute}, #{item.agencyAccountId})
     </insert>
 
     <insert id="addBatch">
         INSERT INTO cost_minute(<include refid="Base_Column_List"/>)
         VALUES
         <foreach collection="list" index="index" item="item" separator=",">
-            (#{item.dt}, #{item.createTime}, #{item.campaignId},
-            #{item.hour}, #{item.minute}, #{item.adId},
-            #{item.agencyAccountId}, #{item.adgroupId}, #{item.adcreativeId},
-            #{item.accountId}, #{item.costMinute}, #{item.costDiff},
-            #{item.costLastHour}, #{item.costLastHourDiff}, #{item.costLastTwoHour},
-            #{item.costLastTwoHourDiff}, #{item.costLastThreeTrend}, #{item.costSpeed},
-            #{item.viewCountMinute}, #{item.thousandDisplayPriceMinute}, #{item.validClickCountMinute},
-            #{item.ctrMinute}, #{item.cpcMinute}, #{item.conversionsCountMinute},
-            #{item.conversionsCostMinute}, #{item.conversionsRateMinute}, #{item.firstDayOrderRoiMinute},
-            #{item.firstDayOrderAmountMinute}, #{item.firstDayOrderCountMinute}, #{item.webOrderAmountMinute},
-            #{item.webOrderCostMinute}, #{item.webOrderRateMinute}, #{item.webOrderCountMinute},
-            #{item.orderRoiMinute}, #{item.orderUnitPriceMinute}, #{item.fromFollowUvMinute},
-            #{item.fromFollowCostMinute}, #{item.fromFollowRateMinute}, #{item.webRegisterCountMinute},
-            #{item.webRegisterUvMinute}, #{item.webRegisterCostMinute})
+            (#{item.dt}, #{item.minute}, #{item.createTime},
+            #{item.hour}, #{item.adId}, #{item.adgroupId},
+            #{item.adcreativeId}, #{item.accountId}, #{item.campaignId},
+            #{item.costMinute}, #{item.costDiff}, #{item.costLastHour},
+            #{item.costLastHourDiff}, #{item.costLastTwoHour}, #{item.costLastTwoHourDiff},
+            #{item.costLastThreeTrend}, #{item.costSpeed}, #{item.viewCountMinute},
+            #{item.thousandDisplayPriceMinute}, #{item.validClickCountMinute}, #{item.ctrMinute},
+            #{item.cpcMinute}, #{item.conversionsCountMinute}, #{item.conversionsCostMinute},
+            #{item.conversionsRateMinute}, #{item.firstDayOrderRoiMinute}, #{item.firstDayOrderAmountMinute},
+            #{item.firstDayOrderCountMinute}, #{item.webOrderAmountMinute}, #{item.webOrderCostMinute},
+            #{item.webOrderRateMinute}, #{item.webOrderCountMinute}, #{item.orderRoiMinute},
+            #{item.orderUnitPriceMinute}, #{item.fromFollowUvMinute}, #{item.fromFollowCostMinute},
+            #{item.fromFollowRateMinute}, #{item.webRegisterCountMinute}, #{item.webRegisterUvMinute},
+            #{item.webRegisterCostMinute}, #{item.agencyAccountId})
         </foreach>
     </insert>
 </mapper>

+ 6 - 4
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/kafka/KafkaComponent.java

@@ -17,11 +17,11 @@ public class KafkaComponent {
         public static final String adDayTopic = "ad_day_cost_topic";
 
         public static class KafkaGroupId {
-            public static final String adHourConsumerGroup = "ad_hour_consumer";
-            public static final String adDayConsumerGroup = "ad_day_consumer";
+            public static final String adHourConsumerGroup = "ad_hour_consumer1";
+            public static final String adDayConsumerGroup = "ad_day_consumer1";
 
-            public static final String planHourConsumerGroup = "plan_hour_consumer";
-            public static final String planDayConsumerGroup = "plan_day_consumer";
+            public static final String planHourConsumerGroup = "plan_hour_consumer1";
+            public static final String planDayConsumerGroup = "plan_day_consumer1";
         }
     }
 
@@ -40,6 +40,8 @@ public class KafkaComponent {
         //每次Poll的最大数量。
         //注意该值不要改得太大,如果Poll太多数据,而不能在下次Poll之前消费完,则会触发一次负载均衡,产生卡顿。
         kafkaProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
+        kafkaProps.put("commit.offsets.on.checkpoint", "true");
+        kafkaProps.put("register.consumer.metrics", "true");
 
         //Hostname校验改成空。
         kafkaProps.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");

+ 90 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/pojo/entity/AdStatOfDayDWD.java

@@ -4,12 +4,14 @@ import com.aliyun.odps.data.Record;
 import com.google.gson.annotations.SerializedName;
 import flink.zanxiangnet.ad.monitoring.maxcompute.bean.annotation.MaxComputeColumn;
 import flink.zanxiangnet.ad.monitoring.maxcompute.bean.annotation.MaxComputeTable;
+import flink.zanxiangnet.ad.monitoring.util.DateUtil;
 import flink.zanxiangnet.ad.monitoring.util.ObjectUtil;
 import lombok.Data;
 import org.springframework.beans.BeanUtils;
 
 import java.io.Serializable;
 import java.math.BigDecimal;
+import java.time.LocalDate;
 import java.util.Date;
 
 @Data
@@ -1454,6 +1456,94 @@ public class AdStatOfDayDWD implements Serializable {
         return result;
     }
 
+    public static AdStatOfDayDWD completion(LocalDate completionDay, AdStatOfDayDWD lastReduce) {
+        String statDay = DateUtil.formatLocalDate(completionDay);
+        AdStatOfDayDWD result = new AdStatOfDayDWD();
+        BeanUtils.copyProperties(lastReduce, result);
+        result.setCreateTime(new Date());
+        result.setStatDay(statDay);
+        result.setCostDeviationRateDay(0.0);
+        result.setCostDay(0L);
+        result.setCompensationAmountDay(0L);
+        result.setViewCountDay(0L);
+        result.setThousandDisplayPriceDay(0L);
+        result.setViewUserCountDay(0L);
+        result.setAvgViewPerUserDay(0.0);
+        result.setValidClickCountDay(0L);
+        result.setClickUserCountDay(0L);
+        result.setCtrDay(0.0);
+        result.setCpcDay(0L);
+        result.setValuableClickCountDay(0L);
+        result.setValuableClickRateDay(0.0);
+        result.setValuableClickCostDay(0L);
+        result.setConversionsCountDay(0L);
+        result.setConversionsCostDay(0L);
+        result.setConversionsRateDay(0.0);
+        result.setDeepConversionsCountDay(0L);
+        result.setDeepConversionsCostDay(0L);
+        result.setDeepConversionsRateDay(0.0);
+        result.setKeyPageUvDay(0L);
+        result.setOrderCountDay(0L);
+        result.setFirstDayOrderCountDay(0L);
+        result.setWebOrderCostDay(0L);
+        result.setOrderRateDay(0.0);
+        result.setOrderAmountDay(0L);
+        result.setFirstDayOrderAmountDay(0L);
+        result.setOrderUnitPriceDay(0L);
+        result.setOrderRoiDay(0.0);
+        result.setSignInCountDay(0L);
+        result.setAddWishlistCountDay(0L);
+        result.setViewCommodityPageUvDay(0L);
+        result.setPageReservationCountDay(0L);
+        result.setLeadsPurchaseUvDay(0L);
+        result.setLeadsPurchaseCostDay(0L);
+        result.setLeadsPurchaseRateDay(0.0);
+        result.setScanFollowCountDay(0L);
+        result.setWechatAppRegisterUvDay(0L);
+        result.setWechatMinigameRegisterCostDay(0L);
+        result.setWechatMinigameRegisterRateDay(0.0);
+        result.setWechatMinigameArpuDay(0.0);
+        result.setWechatMinigameRetentionCountDay(0L);
+        result.setWechatMinigameCheckoutCountDay(0L);
+        result.setWechatMinigameCheckoutAmountDay(0L);
+        result.setOfficialAccountFollowCountDay(0L);
+        result.setOfficialAccountFollowCostDay(0L);
+        result.setOfficialAccountFollowRateDay(0.0);
+        result.setOfficialAccountRegisterUserCountDay(0L);
+        result.setOfficialAccountRegisterRateDay(0.0);
+        result.setOfficialAccountRegisterCostDay(0L);
+        result.setOfficialAccountRegisterAmountDay(0L);
+        result.setOfficialAccountRegisterRoiDay(0L);
+        result.setOfficialAccountApplyCountDay(0L);
+        result.setOfficialAccountApplyUserCountDay(0L);
+        result.setOfficialAccountApplyRateDay(0.0);
+        result.setOfficialAccountApplyCostDay(0L);
+        result.setOfficialAccountApplyAmountDay(0L);
+        result.setOfficialAccountApplyRoiDay(0L);
+        result.setOfficialAccountOrderCountDay(0L);
+        result.setOfficialAccountFirstDayOrderCountDay(0L);
+        result.setOfficialAccountOrderUserCountDay(0L);
+        result.setOfficialAccountOrderRateDay(0.0);
+        result.setOfficialAccountOrderCostDay(0L);
+        result.setOfficialAccountOrderAmountDay(0L);
+        result.setOfficialAccountFirstDayOrderAmountDay(0L);
+        result.setOfficialAccountOrderRoiDay(0L);
+        result.setOfficialAccountConsultCountDay(0L);
+        result.setOfficialAccountReaderCountDay(0L);
+        result.setOfficialAccountCreditApplyUserCountDay(0L);
+        result.setOfficialAccountCreditUserCountDay(0L);
+        result.setForwardCountDay(0L);
+        result.setForwardUserCountDay(0L);
+        result.setNoInterestCountDay(0L);
+        return result;
+    }
+
+    public static AdStatOfDayDWD byHourDWD(AdStatOfHourDWD hourDWD) {
+        AdStatOfDayDWD result = new AdStatOfDayDWD();
+        BeanUtils.copyProperties(hourDWD, result);
+        return result;
+    }
+
     public void removeNull() {
 
         if (statDay == null) {

+ 101 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdDayOnTimeStreamCompletionProcess.java

@@ -0,0 +1,101 @@
+package flink.zanxiangnet.ad.monitoring.process;
+
+import flink.zanxiangnet.ad.monitoring.config.ClickhouseDataSourceFactory;
+import flink.zanxiangnet.ad.monitoring.dao.mapper.AdStatOfDayDWDMapper;
+import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfDayDWD;
+import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
+import flink.zanxiangnet.ad.monitoring.util.DateUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.ibatis.datasource.DataSourceFactory;
+import org.apache.ibatis.mapping.Environment;
+import org.apache.ibatis.session.SqlSession;
+import org.apache.ibatis.session.SqlSessionFactory;
+import org.apache.ibatis.session.SqlSessionFactoryBuilder;
+import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory;
+
+import java.time.LocalDate;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * 小时流数据补全,把中间没消耗的时间段填充 0
+ */
+@Slf4j
+public class AdDayOnTimeStreamCompletionProcess extends KeyedProcessFunction<Long, AdStatOfDayDWD, AdStatOfDayDWD> {
+
+    private SqlSessionFactory sqlSessionFactory;
+    // 上次查询的时间
+    private ValueState<String> lastQueryDayState;
+
+    @Override
+    public void open(Configuration conf) {
+        Map<String, String> params = getRuntimeContext()
+                .getExecutionConfig()
+                .getGlobalJobParameters()
+                .toMap();
+
+        Properties ckProps = new Properties();
+        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_URL, params.get(ApplicationProperties.CK_URL));
+        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_USER, params.get(ApplicationProperties.CK_USERNAME));
+        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_PASSWORD, params.get(ApplicationProperties.CK_PASSWORD));
+
+        DataSourceFactory dataSourceFactory = new ClickhouseDataSourceFactory();
+        dataSourceFactory.setProperties(ckProps);
+        Environment environment = new Environment("clickhouse", new JdbcTransactionFactory(), dataSourceFactory.getDataSource());
+        org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration(environment);
+        // 开启驼峰规则
+        configuration.setMapUnderscoreToCamelCase(true);
+        configuration.getTypeAliasRegistry().registerAlias(AdStatOfDayDWD.class);
+        // addMapper一定要放到 alias的后面!!!!!
+        configuration.addMapper(AdStatOfDayDWDMapper.class);
+        sqlSessionFactory = new SqlSessionFactoryBuilder().build(configuration);
+
+        lastQueryDayState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastQueryDayState", String.class));
+    }
+
+    @Override
+    public void processElement(AdStatOfDayDWD adStatOfDayDWD, KeyedProcessFunction<Long, AdStatOfDayDWD, AdStatOfDayDWD>.Context context, Collector<AdStatOfDayDWD> collector) throws Exception {
+        String lastQueryTime = lastQueryDayState.value();
+        if (lastQueryTime != null && lastQueryTime.compareTo(adStatOfDayDWD.getStatDay()) >= 0) {
+            collector.collect(adStatOfDayDWD);
+            return;
+        }
+        lastQueryDayState.update(adStatOfDayDWD.getStatDay());
+        LocalDate statDay = DateUtil.parseLocalDate(adStatOfDayDWD.getStatDay());
+        try (SqlSession session = sqlSessionFactory.openSession()) {
+            AdStatOfDayDWDMapper mapper = session.getMapper(AdStatOfDayDWDMapper.class);
+            List<AdStatOfDayDWD> lastReduceList = mapper.lastReduceResult(adStatOfDayDWD.getAdId(), null, DateUtil.formatLocalDate(statDay.minusDays(1L)), 1);
+            if (lastReduceList.isEmpty()) {
+                collector.collect(adStatOfDayDWD);
+                return;
+            }
+            AdStatOfDayDWD lastReduce = lastReduceList.get(0);
+            LocalDate statDate = DateUtil.parseLocalDate(adStatOfDayDWD.getStatDay());
+            LocalDate lastStatDate = DateUtil.parseLocalDate(lastReduce.getStatDay());
+            if (lastStatDate.compareTo(statDate) >= 0) {
+                collector.collect(adStatOfDayDWD);
+                return;
+            }
+            long days = DateUtil.intervalOfDays(lastStatDate, statDate);
+            if (days > 1) {
+                long start = System.currentTimeMillis();
+                // 中间有没数据的时间段,需要进行数据填充
+                for (int i = 1; i < days; i++) {
+                    // 要填充的时间
+                    LocalDate completionDay = lastStatDate.plusDays(i);
+                    collector.collect(AdStatOfDayDWD.completion(completionDay, lastReduce));
+                }
+                if (days > 12) {
+                    log.error("实时天:给 adId[{}]造数据用时 {},总计造了:{},起始时间:{},截至时间;{}", adStatOfDayDWD.getAdId(), (System.currentTimeMillis() - start), days - 1, lastStatDate, statDate);
+                }
+            }
+            collector.collect(adStatOfDayDWD);
+        }
+    }
+}

+ 1 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdHourOnTimeStreamCompletionProcess.java

@@ -47,7 +47,7 @@ public class AdHourOnTimeStreamCompletionProcess extends KeyedProcessFunction<Lo
                     collector.collect(AdStatOfHourDWD.completion(completionTime, lastReduce));
                 }
                 if (hours > 12) {
-                    log.error("实时:给 adId[{}]造数据用时 {},总计造了:{},起始时间:{},截至时间;{}", adStatOfHourDWD.getAdId(), (System.currentTimeMillis() - start), hours - 1, lastStatDateTime, statDateTime);
+                    log.error("实时小时:给 adId[{}]造数据用时 {},总计造了:{},起始时间:{},截至时间;{}", adStatOfHourDWD.getAdId(), (System.currentTimeMillis() - start), hours - 1, lastStatDateTime, statDateTime);
                 }
             }
             lastReduceState.update(adStatOfHourDWD);

+ 0 - 63
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdHourRollbackStreamCompletionProcess.java

@@ -1,63 +0,0 @@
-package flink.zanxiangnet.ad.monitoring.process;
-
-import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfHourDWD;
-import flink.zanxiangnet.ad.monitoring.util.DateUtil;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
-import org.apache.flink.util.Collector;
-
-import java.time.LocalDateTime;
-import java.time.LocalTime;
-
-/**
- * 小时流数据补全,把中间没消耗的时间段填充 0
- */
-@Slf4j
-public class AdHourRollbackStreamCompletionProcess extends KeyedProcessFunction<Long, AdStatOfHourDWD, AdStatOfHourDWD> {
-
-    private ValueState<AdStatOfHourDWD> lastReduceState;
-
-    @Override
-    public void open(Configuration conf) {
-        lastReduceState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastReduceState", Types.POJO(AdStatOfHourDWD.class)));
-    }
-
-    @Override
-    public void processElement(AdStatOfHourDWD adStatOfHourDWD, KeyedProcessFunction<Long, AdStatOfHourDWD, AdStatOfHourDWD>.Context context, Collector<AdStatOfHourDWD> collector) throws Exception {
-        AdStatOfHourDWD lastReduce = lastReduceState.value();
-        if (lastReduce == null) {
-            lastReduceState.update(adStatOfHourDWD);
-            collector.collect(adStatOfHourDWD);
-            return;
-        }
-        LocalDateTime statDateTime = LocalDateTime.of(DateUtil.parseLocalDate(adStatOfHourDWD.getStatDay()), LocalTime.of(adStatOfHourDWD.getHour(), 0, 0));
-        LocalDateTime lastStatDateTime = LocalDateTime.of(DateUtil.parseLocalDate(lastReduce.getStatDay()), LocalTime.of(lastReduce.getHour(), 0, 0));
-        if (lastStatDateTime.compareTo(statDateTime) >= 0) {
-            lastReduceState.update(adStatOfHourDWD);
-            collector.collect(adStatOfHourDWD);
-            return;
-        }
-        long hours = DateUtil.intervalOfHour(lastStatDateTime, statDateTime);
-        if (hours > 1) {
-            if (hours > 24) {
-                log.error("回滚:adId[{}] 要造 {}条数据, 从 {} 到 {}结束", adStatOfHourDWD.getAdId(), hours, lastStatDateTime, statDateTime);
-            }
-            long start = System.currentTimeMillis();
-            // 中间有没数据的时间段,需要进行数据填充
-            for (int i = 1; i < (hours >= 24 ? 24 : hours); i++) {
-                // 要填充的时间
-                LocalDateTime completionTime = lastStatDateTime.plusHours(i);
-                collector.collect(AdStatOfHourDWD.completion(completionTime, lastReduce));
-            }
-            if (hours > 24) {
-                log.error("回滚:给 adId[{}]造数据用时 {}", adStatOfHourDWD.getAdId(), (System.currentTimeMillis() - start));
-            }
-        }
-        collector.collect(adStatOfHourDWD);
-        lastReduceState.update(adStatOfHourDWD);
-    }
-}