Forráskód Böngészése

优化实时处理的逻辑

wcc 3 éve
szülő
commit
ce40cc083f

+ 8 - 8
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdHourStreamJob.java

@@ -54,9 +54,6 @@ public class AdHourStreamJob {
         // 广告小时数据(往前回滚 10天)
         final OutputTag<AdDataOfHourODS> adHourStreamTag = new OutputTag<AdDataOfHourODS>("adHourStream") {
         };
-        // 广告小时数据(从分钟数据流切出来的整点数据)
-        final OutputTag<AdStatOfHourDWD> adHourFromMinuteStreamTag = new OutputTag<AdStatOfHourDWD>("adHourFromMinuteStream") {
-        };
 
         // 对流进行映射,拆分(实时的分钟流和回滚的小时流)
         SingleOutputStreamOperator<AdDataOfMinuteODS> adODSStream = adStreamOfMinuteIn.filter(StringUtils::isNotBlank)
@@ -79,13 +76,13 @@ public class AdHourStreamJob {
                 // 开一个 5分钟的滚动窗口
                 .window(TumblingEventTimeWindows.of(Time.minutes(5L)))
                 .trigger(new AdMinuteODSStreamTrigger())
-                .process(new AdMinuteDWDProcess(adHourFromMinuteStreamTag));
+                .process(new AdMinuteDWDProcess());
         new KeyedBatchStream<>("adMinuteDWDStream", adMinuteDWDStream.keyBy(AdStatOfMinuteDWD::getStatDay), 4000L, 60 * 1000L)
                 .toBatch()
                 .addSink(new TunnelBatchStreamSink<>(AdStatOfMinuteDWD.class))
                 .name("sink_ad_minute_dwd");
 
-        //cost----分钟数据处理
+        //分钟流-写入 ck
         SingleOutputStreamOperator<CostMinuterDM> clickhouseMinuteDmStream =
                 adMinuteDWDStream
                         .keyBy(AdStatOfMinuteDWD::getAdId)
@@ -109,10 +106,13 @@ public class AdHourStreamJob {
                 adHourODSStream.keyBy(AdDataOfHourODS::getAdId)
                         .process(new AdHourDWDProcess());
 
-        DataStream<AdStatOfHourDWD> adHourDWDAllStream = adMinuteDWDStream.getSideOutput(adHourFromMinuteStreamTag)
+        DataStream<AdStatOfHourDWD> adHourDWDAllStream = adMinuteDWDStream.map(AdStatOfHourDWD::byMinuteDWD)
                 .keyBy(AdStatOfHourDWD::getAdId)
-                .process(new AdHourStreamCompletionProcess())
-                .union(adHourDWDStream);
+                .process(new AdHourOnTimeStreamCompletionProcess())
+                .union(adHourDWDStream
+                        .keyBy(AdStatOfHourDWD::getAdId)
+                        .process(new AdHourRollbackStreamCompletionProcess())
+                );
         new KeyedBatchStream<>("adHourDWDStream", adHourDWDAllStream.keyBy(AdStatOfHourDWD::getStatDay), 4000L, 60 * 1000L)
                 .toBatch()
                 .addSink(new TunnelBatchStreamSink<>(AdStatOfHourDWD.class))

+ 143 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/pojo/entity/AdStatOfHourDWD.java

@@ -5,10 +5,12 @@ import com.google.gson.annotations.SerializedName;
 import flink.zanxiangnet.ad.monitoring.maxcompute.bean.BeanUtil;
 import flink.zanxiangnet.ad.monitoring.maxcompute.bean.annotation.MaxComputeColumn;
 import flink.zanxiangnet.ad.monitoring.maxcompute.bean.annotation.MaxComputeTable;
+import flink.zanxiangnet.ad.monitoring.util.DateUtil;
 import flink.zanxiangnet.ad.monitoring.util.ObjectUtil;
 import lombok.Data;
 import org.springframework.beans.BeanUtils;
 
+import java.time.LocalDateTime;
 import java.util.Date;
 
 /**
@@ -1838,6 +1840,147 @@ public class AdStatOfHourDWD {
         return result;
     }
 
+    public static AdStatOfHourDWD completion(LocalDateTime completionTime, AdStatOfHourDWD lastReduceData) {
+        String statDay = DateUtil.formatLocalDate(completionTime.toLocalDate());
+        int hour = completionTime.getHour();
+        AdStatOfHourDWD result = new AdStatOfHourDWD();
+        BeanUtils.copyProperties(lastReduceData, result);
+        result.setCreateTime(new Date());
+        result.setStatDay(statDay);
+        result.setHour(hour);
+        AdStatOfHourDWD.initValue(result);
+
+        result.setCostDeviationRateTotal(lastReduceData.getCostDeviationRateTotal());
+        result.setCostTotal(lastReduceData.getCostTotal());
+        result.setCompensationAmountTotal(lastReduceData.getCompensationAmountTotal());
+        result.setViewCountTotal(lastReduceData.getViewCountTotal());
+        result.setThousandDisplayPriceAll(lastReduceData.getThousandDisplayPriceAll());
+        result.setValidClickCountTotal(lastReduceData.getValidClickCountTotal());
+        result.setCtrAll(lastReduceData.getCtrAll());
+        result.setCpcAll(lastReduceData.getCpcAll());
+        result.setValuableClickCountTotal(lastReduceData.getValuableClickCountTotal());
+        result.setValuableClickRateAll(lastReduceData.getValuableClickRateAll());
+        result.setValuableClickCostAll(lastReduceData.getValuableClickCostAll());
+        result.setConversionsCountTotal(lastReduceData.getConversionsCountTotal());
+        result.setConversionsCostAll(lastReduceData.getConversionsCostAll());
+        result.setConversionsRateAll(lastReduceData.getConversionsRateAll());
+        result.setDeepConversionsCountTotal(lastReduceData.getDeepConversionsCountTotal());
+        result.setDeepConversionsCostAll(lastReduceData.getDeepConversionsCostAll());
+        result.setDeepConversionsRateAll(lastReduceData.getDeepConversionsRateAll());
+        result.setOrderCountTotal(lastReduceData.getOrderCountTotal());
+        result.setFirstDayOrderCountTotal(lastReduceData.getFirstDayOrderCountTotal());
+        result.setWebOrderCostAll(lastReduceData.getWebOrderCostAll());
+        result.setOrderRateAll(lastReduceData.getOrderRateAll());
+        result.setOrderAmountTotal(lastReduceData.getOrderAmountTotal());
+        result.setFirstDayOrderAmountTotal(lastReduceData.getFirstDayOrderAmountTotal());
+        result.setOrderUnitPriceAll(lastReduceData.getOrderUnitPriceAll());
+        result.setOrderRoiAll(lastReduceData.getOrderRoiAll());
+        result.setSignInCountTotal(lastReduceData.getSignInCountTotal());
+        result.setScanFollowCountTotal(lastReduceData.getScanFollowCountTotal());
+        result.setWechatAppRegisterUvTotal(lastReduceData.getWechatAppRegisterUvTotal());
+        result.setWechatMinigameRegisterCostAll(lastReduceData.getWechatMinigameRegisterCostAll());
+        result.setWechatMinigameRegisterRateAll(lastReduceData.getWechatMinigameRegisterRateAll());
+        result.setWechatMinigameArpuAll(lastReduceData.getWechatMinigameArpuAll());
+        result.setWechatMinigameRetentionCountTotal(lastReduceData.getWechatMinigameRetentionCountTotal());
+        result.setWechatMinigameCheckoutCountTotal(lastReduceData.getWechatMinigameCheckoutCountTotal());
+        result.setWechatMinigameCheckoutAmountTotal(lastReduceData.getWechatMinigameCheckoutAmountTotal());
+        result.setOfficialAccountFollowCountTotal(lastReduceData.getOfficialAccountFollowCountTotal());
+        result.setOfficialAccountFollowRateAll(lastReduceData.getOfficialAccountFollowRateAll());
+        result.setOfficialAccountRegisterUserCountTotal(lastReduceData.getOfficialAccountRegisterUserCountTotal());
+        result.setOfficialAccountRegisterRateAll(lastReduceData.getOfficialAccountRegisterRateAll());
+        result.setOfficialAccountRegisterCostAll(lastReduceData.getOfficialAccountRegisterCostAll());
+        result.setOfficialAccountRegisterAmountTotal(lastReduceData.getOfficialAccountRegisterAmountTotal());
+        result.setOfficialAccountRegisterRoiAll(lastReduceData.getOfficialAccountRegisterRoiAll());
+        result.setOfficialAccountApplyCountTotal(lastReduceData.getOfficialAccountApplyCountTotal());
+        result.setOfficialAccountApplyUserCountTotal(lastReduceData.getOfficialAccountApplyUserCountTotal());
+        result.setOfficialAccountApplyRateAll(lastReduceData.getOfficialAccountApplyRateAll());
+        result.setOfficialAccountApplyCostAll(lastReduceData.getOfficialAccountApplyCostAll());
+        result.setOfficialAccountApplyAmountTotal(lastReduceData.getOfficialAccountApplyAmountTotal());
+        result.setOfficialAccountApplyRoiAll(lastReduceData.getOfficialAccountApplyRoiAll());
+        result.setOfficialAccountOrderCountTotal(lastReduceData.getOfficialAccountOrderCountTotal());
+        result.setOfficialAccountFirstDayOrderCountTotal(lastReduceData.getOfficialAccountFirstDayOrderCountTotal());
+        result.setOfficialAccountOrderUserCountTotal(lastReduceData.getOfficialAccountOrderUserCountTotal());
+        result.setOfficialAccountOrderRateAll(lastReduceData.getOfficialAccountOrderRateAll());
+        result.setOfficialAccountOrderCostAll(lastReduceData.getOfficialAccountOrderCostAll());
+        result.setOfficialAccountOrderAmountTotal(lastReduceData.getOfficialAccountOrderAmountTotal());
+        result.setOfficialAccountFirstDayOrderAmountTotal(lastReduceData.getOfficialAccountFirstDayOrderAmountTotal());
+        result.setOfficialAccountOrderRoiAll(lastReduceData.getOfficialAccountOrderRoiAll());
+        result.setOfficialAccountConsultCountTotal(lastReduceData.getOfficialAccountConsultCountTotal());
+        result.setOfficialAccountReaderCountTotal(lastReduceData.getOfficialAccountReaderCountTotal());
+        result.setOfficialAccountCreditApplyUserCountTotal(lastReduceData.getOfficialAccountCreditApplyUserCountTotal());
+        result.setOfficialAccountCreditUserCountTotal(lastReduceData.getOfficialAccountCreditUserCountTotal());
+        result.setForwardCountTotal(lastReduceData.getForwardCountTotal());
+        result.setForwardUserCountTotal(lastReduceData.getForwardUserCountTotal());
+        result.setNoInterestCountTotal(lastReduceData.getNoInterestCountTotal());
+        result.setNoInterestCountHour(lastReduceData.getNoInterestCountHour());
+
+        if (statDay.equals(lastReduceData.getStatDay())) {
+            result.setCostDeviationRateDay(lastReduceData.getCostDeviationRateDay());
+            result.setCostDay(lastReduceData.getCostDay());
+            result.setCompensationAmountDay(lastReduceData.getCompensationAmountDay());
+            result.setViewCountDay(lastReduceData.getViewCountDay());
+            result.setThousandDisplayPriceDay(lastReduceData.getThousandDisplayPriceDay());
+            result.setValidClickCountDay(lastReduceData.getValidClickCountDay());
+            result.setCtrDay(lastReduceData.getCtrDay());
+            result.setCpcDay(lastReduceData.getCpcDay());
+            result.setValuableClickCountDay(lastReduceData.getValuableClickCountDay());
+            result.setValuableClickRateDay(lastReduceData.getValuableClickRateDay());
+            result.setValuableClickCostDay(lastReduceData.getValuableClickCostDay());
+            result.setConversionsCountDay(lastReduceData.getConversionsCountDay());
+            result.setConversionsCostDay(lastReduceData.getConversionsCostDay());
+            result.setConversionsRateDay(lastReduceData.getConversionsRateDay());
+            result.setDeepConversionsCountDay(lastReduceData.getDeepConversionsCountDay());
+            result.setDeepConversionsCostDay(lastReduceData.getDeepConversionsCostDay());
+            result.setDeepConversionsRateDay(lastReduceData.getDeepConversionsRateDay());
+            result.setOrderCountDay(lastReduceData.getOrderCountDay());
+            result.setFirstDayOrderCountDay(lastReduceData.getFirstDayOrderCountDay());
+            result.setWebOrderCostDay(lastReduceData.getWebOrderCostDay());
+            result.setOrderRateDay(lastReduceData.getOrderRateDay());
+            result.setOrderAmountDay(lastReduceData.getOrderAmountDay());
+            result.setFirstDayOrderAmountDay(lastReduceData.getFirstDayOrderAmountDay());
+            result.setOrderUnitPriceDay(lastReduceData.getOrderUnitPriceDay());
+            result.setOrderRoiDay(lastReduceData.getOrderRoiDay());
+            result.setSignInCountDay(lastReduceData.getSignInCountDay());
+            result.setScanFollowCountDay(lastReduceData.getScanFollowCountDay());
+            result.setWechatAppRegisterUvDay(lastReduceData.getWechatAppRegisterUvDay());
+            result.setWechatMinigameRegisterCostDay(lastReduceData.getWechatMinigameRegisterCostDay());
+            result.setWechatMinigameRegisterRateDay(lastReduceData.getWechatMinigameRegisterRateDay());
+            result.setWechatMinigameArpuDay(lastReduceData.getWechatMinigameArpuDay());
+            result.setWechatMinigameRetentionCountDay(lastReduceData.getWechatMinigameRetentionCountDay());
+            result.setWechatMinigameCheckoutCountDay(lastReduceData.getWechatMinigameCheckoutCountDay());
+            result.setWechatMinigameCheckoutAmountDay(lastReduceData.getWechatMinigameCheckoutAmountDay());
+            result.setOfficialAccountFollowCountDay(lastReduceData.getOfficialAccountFollowCountDay());
+            result.setOfficialAccountFollowRateDay(lastReduceData.getOfficialAccountFollowRateDay());
+            result.setOfficialAccountRegisterUserCountDay(lastReduceData.getOfficialAccountRegisterUserCountDay());
+            result.setOfficialAccountRegisterRateDay(lastReduceData.getOfficialAccountRegisterRateDay());
+            result.setOfficialAccountRegisterCostDay(lastReduceData.getOfficialAccountRegisterCostDay());
+            result.setOfficialAccountRegisterAmountDay(lastReduceData.getOfficialAccountRegisterAmountDay());
+            result.setOfficialAccountRegisterRoiDay(lastReduceData.getOfficialAccountRegisterRoiDay());
+            result.setOfficialAccountApplyCountDay(lastReduceData.getOfficialAccountApplyCountDay());
+            result.setOfficialAccountApplyUserCountDay(lastReduceData.getOfficialAccountApplyUserCountDay());
+            result.setOfficialAccountApplyRateDay(lastReduceData.getOfficialAccountApplyRateDay());
+            result.setOfficialAccountApplyCostDay(lastReduceData.getOfficialAccountApplyCostDay());
+            result.setOfficialAccountApplyAmountDay(lastReduceData.getOfficialAccountApplyAmountDay());
+            result.setOfficialAccountApplyRoiDay(lastReduceData.getOfficialAccountApplyRoiDay());
+            result.setOfficialAccountOrderCountDay(lastReduceData.getOfficialAccountOrderCountDay());
+            result.setOfficialAccountFirstDayOrderCountDay(lastReduceData.getOfficialAccountFirstDayOrderCountDay());
+            result.setOfficialAccountOrderUserCountDay(lastReduceData.getOfficialAccountOrderUserCountDay());
+            result.setOfficialAccountOrderRateDay(lastReduceData.getOfficialAccountOrderRateDay());
+            result.setOfficialAccountOrderCostDay(lastReduceData.getOfficialAccountOrderCostDay());
+            result.setOfficialAccountOrderAmountDay(lastReduceData.getOfficialAccountOrderAmountDay());
+            result.setOfficialAccountFirstDayOrderAmountDay(lastReduceData.getOfficialAccountFirstDayOrderAmountDay());
+            result.setOfficialAccountOrderRoiDay(lastReduceData.getOfficialAccountOrderRoiDay());
+            result.setOfficialAccountConsultCountDay(lastReduceData.getOfficialAccountConsultCountDay());
+            result.setOfficialAccountReaderCountDay(lastReduceData.getOfficialAccountReaderCountDay());
+            result.setOfficialAccountCreditApplyUserCountDay(lastReduceData.getOfficialAccountCreditApplyUserCountDay());
+            result.setOfficialAccountCreditUserCountDay(lastReduceData.getOfficialAccountCreditUserCountDay());
+            result.setForwardCountDay(lastReduceData.getForwardCountDay());
+            result.setForwardUserCountDay(lastReduceData.getForwardUserCountDay());
+            result.setNoInterestCountDay(lastReduceData.getNoInterestCountDay());
+        }
+        return result;
+    }
+
     public static void initValue(AdStatOfHourDWD bean) {
         bean.setCostDeviationRateTotal(0.0);
         bean.setCostDeviationRateDay(0.0);

+ 51 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdHourOnTimeStreamCompletionProcess.java

@@ -0,0 +1,51 @@
+package flink.zanxiangnet.ad.monitoring.process;
+
+import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfHourDWD;
+import flink.zanxiangnet.ad.monitoring.util.DateUtil;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+
+/**
+ * 小时流数据补全,把中间没消耗的时间段填充 0
+ */
+public class AdHourOnTimeStreamCompletionProcess extends KeyedProcessFunction<Long, AdStatOfHourDWD, AdStatOfHourDWD> {
+
+    private ValueState<AdStatOfHourDWD> lastReduceState;
+
+    @Override
+    public void open(Configuration conf) {
+        lastReduceState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastReduceState", Types.POJO(AdStatOfHourDWD.class)));
+    }
+
+    @Override
+    public void processElement(AdStatOfHourDWD adStatOfHourDWD, KeyedProcessFunction<Long, AdStatOfHourDWD, AdStatOfHourDWD>.Context context, Collector<AdStatOfHourDWD> collector) throws Exception {
+        AdStatOfHourDWD lastReduce = lastReduceState.value();
+        if (lastReduce == null) {
+            lastReduceState.update(adStatOfHourDWD);
+            collector.collect(adStatOfHourDWD);
+            return;
+        }
+        LocalDateTime statDateTime = LocalDateTime.of(DateUtil.parseLocalDate(adStatOfHourDWD.getStatDay()), LocalTime.of(adStatOfHourDWD.getHour(), 0, 0));
+        LocalDateTime lastStatDateTime = LocalDateTime.of(DateUtil.parseLocalDate(lastReduce.getStatDay()), LocalTime.of(lastReduce.getHour(), 0, 0));
+        if (lastStatDateTime.compareTo(statDateTime) <= 0) {
+            long hours = DateUtil.intervalOfHour(lastStatDateTime, statDateTime);
+            if (hours > 1) {
+                // 中间有没数据的时间段,需要进行数据填充
+                for (int i = 1; i < hours; i++) {
+                    // 要填充的时间
+                    LocalDateTime completionTime = lastStatDateTime.plusHours(i);
+                    collector.collect(AdStatOfHourDWD.completion(completionTime, lastReduce));
+                }
+            }
+            lastReduceState.update(adStatOfHourDWD);
+        }
+        collector.collect(adStatOfHourDWD);
+    }
+}

+ 54 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdHourRollbackStreamCompletionProcess.java

@@ -0,0 +1,54 @@
+package flink.zanxiangnet.ad.monitoring.process;
+
+import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfHourDWD;
+import flink.zanxiangnet.ad.monitoring.util.DateUtil;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+
+/**
+ * 小时流数据补全,把中间没消耗的时间段填充 0
+ */
+public class AdHourRollbackStreamCompletionProcess extends KeyedProcessFunction<Long, AdStatOfHourDWD, AdStatOfHourDWD> {
+
+    private ValueState<AdStatOfHourDWD> lastReduceState;
+
+    @Override
+    public void open(Configuration conf) {
+        lastReduceState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastReduceState", Types.POJO(AdStatOfHourDWD.class)));
+    }
+
+    @Override
+    public void processElement(AdStatOfHourDWD adStatOfHourDWD, KeyedProcessFunction<Long, AdStatOfHourDWD, AdStatOfHourDWD>.Context context, Collector<AdStatOfHourDWD> collector) throws Exception {
+        AdStatOfHourDWD lastReduce = lastReduceState.value();
+        if (lastReduce == null) {
+            lastReduceState.update(adStatOfHourDWD);
+            collector.collect(adStatOfHourDWD);
+            return;
+        }
+        LocalDateTime statDateTime = LocalDateTime.of(DateUtil.parseLocalDate(adStatOfHourDWD.getStatDay()), LocalTime.of(adStatOfHourDWD.getHour(), 0, 0));
+        LocalDateTime lastStatDateTime = LocalDateTime.of(DateUtil.parseLocalDate(lastReduce.getStatDay()), LocalTime.of(lastReduce.getHour(), 0, 0));
+        if (lastStatDateTime.compareTo(statDateTime) >= 0) {
+            lastReduceState.update(adStatOfHourDWD);
+            collector.collect(adStatOfHourDWD);
+            return;
+        }
+        long hours = DateUtil.intervalOfHour(lastStatDateTime, statDateTime);
+        if (hours > 1) {
+            // 中间有没数据的时间段,需要进行数据填充
+            for (int i = 1; i < hours; i++) {
+                // 要填充的时间
+                LocalDateTime completionTime = lastStatDateTime.plusHours(i);
+                collector.collect(AdStatOfHourDWD.completion(completionTime, lastReduce));
+            }
+        }
+        collector.collect(adStatOfHourDWD);
+        lastReduceState.update(adStatOfHourDWD);
+    }
+}

+ 0 - 195
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdHourStreamCompletionProcess.java

@@ -1,195 +0,0 @@
-package flink.zanxiangnet.ad.monitoring.process;
-
-import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfHourDWD;
-import flink.zanxiangnet.ad.monitoring.util.DateUtil;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
-import org.apache.flink.util.Collector;
-import org.springframework.beans.BeanUtils;
-
-import java.time.LocalDateTime;
-import java.time.LocalTime;
-import java.util.Date;
-
-/**
- * 小时流数据补全,把中间没消耗的时间段填充 0
- */
-public class AdHourStreamCompletionProcess extends KeyedProcessFunction<Long, AdStatOfHourDWD, AdStatOfHourDWD> {
-
-    private ValueState<AdStatOfHourDWD> lastReduceState;
-
-    @Override
-    public void open(Configuration conf) {
-        lastReduceState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastReduceState", Types.POJO(AdStatOfHourDWD.class)));
-    }
-
-    @Override
-    public void processElement(AdStatOfHourDWD adStatOfHourDWD, KeyedProcessFunction<Long, AdStatOfHourDWD, AdStatOfHourDWD>.Context context, Collector<AdStatOfHourDWD> collector) throws Exception {
-        AdStatOfHourDWD lastReduce = lastReduceState.value();
-        if (lastReduce == null) {
-            lastReduceState.update(adStatOfHourDWD);
-            collector.collect(adStatOfHourDWD);
-            return;
-        }
-        LocalDateTime statDateTime = LocalDateTime.of(DateUtil.parseLocalDate(adStatOfHourDWD.getStatDay()), LocalTime.of(adStatOfHourDWD.getHour(), 0, 0));
-        LocalDateTime lastStatDateTime = LocalDateTime.of(DateUtil.parseLocalDate(lastReduce.getStatDay()), LocalTime.of(lastReduce.getHour(), 0, 0));
-        if (lastStatDateTime.compareTo(statDateTime) >= 0) {
-            return;
-        }
-        long hours = DateUtil.intervalOfHour(lastStatDateTime, statDateTime);
-        if (hours > 1) {
-            // 中间有没数据的时间段,需要进行数据填充
-            for (int i = 1; i < hours; i++) {
-                // 要填充的时间
-                LocalDateTime completionTime = lastStatDateTime.plusHours(i);
-                collector.collect(completion(completionTime, lastReduce));
-            }
-        }
-        collector.collect(adStatOfHourDWD);
-        lastReduceState.update(adStatOfHourDWD);
-    }
-
-    private AdStatOfHourDWD completion(LocalDateTime completionTime, AdStatOfHourDWD lastReduceData) {
-        String statDay = DateUtil.formatLocalDate(completionTime.toLocalDate());
-        int hour = completionTime.getHour();
-        AdStatOfHourDWD result = new AdStatOfHourDWD();
-        BeanUtils.copyProperties(lastReduceData, result);
-        result.setCreateTime(new Date());
-        result.setStatDay(statDay);
-        result.setHour(hour);
-        AdStatOfHourDWD.initValue(result);
-
-        result.setCostDeviationRateTotal(lastReduceData.getCostDeviationRateTotal());
-        result.setCostTotal(lastReduceData.getCostTotal());
-        result.setCompensationAmountTotal(lastReduceData.getCompensationAmountTotal());
-        result.setViewCountTotal(lastReduceData.getViewCountTotal());
-        result.setThousandDisplayPriceAll(lastReduceData.getThousandDisplayPriceAll());
-        result.setValidClickCountTotal(lastReduceData.getValidClickCountTotal());
-        result.setCtrAll(lastReduceData.getCtrAll());
-        result.setCpcAll(lastReduceData.getCpcAll());
-        result.setValuableClickCountTotal(lastReduceData.getValuableClickCountTotal());
-        result.setValuableClickRateAll(lastReduceData.getValuableClickRateAll());
-        result.setValuableClickCostAll(lastReduceData.getValuableClickCostAll());
-        result.setConversionsCountTotal(lastReduceData.getConversionsCountTotal());
-        result.setConversionsCostAll(lastReduceData.getConversionsCostAll());
-        result.setConversionsRateAll(lastReduceData.getConversionsRateAll());
-        result.setDeepConversionsCountTotal(lastReduceData.getDeepConversionsCountTotal());
-        result.setDeepConversionsCostAll(lastReduceData.getDeepConversionsCostAll());
-        result.setDeepConversionsRateAll(lastReduceData.getDeepConversionsRateAll());
-        result.setOrderCountTotal(lastReduceData.getOrderCountTotal());
-        result.setFirstDayOrderCountTotal(lastReduceData.getFirstDayOrderCountTotal());
-        result.setWebOrderCostAll(lastReduceData.getWebOrderCostAll());
-        result.setOrderRateAll(lastReduceData.getOrderRateAll());
-        result.setOrderAmountTotal(lastReduceData.getOrderAmountTotal());
-        result.setFirstDayOrderAmountTotal(lastReduceData.getFirstDayOrderAmountTotal());
-        result.setOrderUnitPriceAll(lastReduceData.getOrderUnitPriceAll());
-        result.setOrderRoiAll(lastReduceData.getOrderRoiAll());
-        result.setSignInCountTotal(lastReduceData.getSignInCountTotal());
-        result.setScanFollowCountTotal(lastReduceData.getScanFollowCountTotal());
-        result.setWechatAppRegisterUvTotal(lastReduceData.getWechatAppRegisterUvTotal());
-        result.setWechatMinigameRegisterCostAll(lastReduceData.getWechatMinigameRegisterCostAll());
-        result.setWechatMinigameRegisterRateAll(lastReduceData.getWechatMinigameRegisterRateAll());
-        result.setWechatMinigameArpuAll(lastReduceData.getWechatMinigameArpuAll());
-        result.setWechatMinigameRetentionCountTotal(lastReduceData.getWechatMinigameRetentionCountTotal());
-        result.setWechatMinigameCheckoutCountTotal(lastReduceData.getWechatMinigameCheckoutCountTotal());
-        result.setWechatMinigameCheckoutAmountTotal(lastReduceData.getWechatMinigameCheckoutAmountTotal());
-        result.setOfficialAccountFollowCountTotal(lastReduceData.getOfficialAccountFollowCountTotal());
-        result.setOfficialAccountFollowRateAll(lastReduceData.getOfficialAccountFollowRateAll());
-        result.setOfficialAccountRegisterUserCountTotal(lastReduceData.getOfficialAccountRegisterUserCountTotal());
-        result.setOfficialAccountRegisterRateAll(lastReduceData.getOfficialAccountRegisterRateAll());
-        result.setOfficialAccountRegisterCostAll(lastReduceData.getOfficialAccountRegisterCostAll());
-        result.setOfficialAccountRegisterAmountTotal(lastReduceData.getOfficialAccountRegisterAmountTotal());
-        result.setOfficialAccountRegisterRoiAll(lastReduceData.getOfficialAccountRegisterRoiAll());
-        result.setOfficialAccountApplyCountTotal(lastReduceData.getOfficialAccountApplyCountTotal());
-        result.setOfficialAccountApplyUserCountTotal(lastReduceData.getOfficialAccountApplyUserCountTotal());
-        result.setOfficialAccountApplyRateAll(lastReduceData.getOfficialAccountApplyRateAll());
-        result.setOfficialAccountApplyCostAll(lastReduceData.getOfficialAccountApplyCostAll());
-        result.setOfficialAccountApplyAmountTotal(lastReduceData.getOfficialAccountApplyAmountTotal());
-        result.setOfficialAccountApplyRoiAll(lastReduceData.getOfficialAccountApplyRoiAll());
-        result.setOfficialAccountOrderCountTotal(lastReduceData.getOfficialAccountOrderCountTotal());
-        result.setOfficialAccountFirstDayOrderCountTotal(lastReduceData.getOfficialAccountFirstDayOrderCountTotal());
-        result.setOfficialAccountOrderUserCountTotal(lastReduceData.getOfficialAccountOrderUserCountTotal());
-        result.setOfficialAccountOrderRateAll(lastReduceData.getOfficialAccountOrderRateAll());
-        result.setOfficialAccountOrderCostAll(lastReduceData.getOfficialAccountOrderCostAll());
-        result.setOfficialAccountOrderAmountTotal(lastReduceData.getOfficialAccountOrderAmountTotal());
-        result.setOfficialAccountFirstDayOrderAmountTotal(lastReduceData.getOfficialAccountFirstDayOrderAmountTotal());
-        result.setOfficialAccountOrderRoiAll(lastReduceData.getOfficialAccountOrderRoiAll());
-        result.setOfficialAccountConsultCountTotal(lastReduceData.getOfficialAccountConsultCountTotal());
-        result.setOfficialAccountReaderCountTotal(lastReduceData.getOfficialAccountReaderCountTotal());
-        result.setOfficialAccountCreditApplyUserCountTotal(lastReduceData.getOfficialAccountCreditApplyUserCountTotal());
-        result.setOfficialAccountCreditUserCountTotal(lastReduceData.getOfficialAccountCreditUserCountTotal());
-        result.setForwardCountTotal(lastReduceData.getForwardCountTotal());
-        result.setForwardUserCountTotal(lastReduceData.getForwardUserCountTotal());
-        result.setNoInterestCountTotal(lastReduceData.getNoInterestCountTotal());
-        result.setNoInterestCountHour(lastReduceData.getNoInterestCountHour());
-
-        if (statDay.equals(lastReduceData.getStatDay())) {
-            result.setCostDeviationRateDay(lastReduceData.getCostDeviationRateDay());
-            result.setCostDay(lastReduceData.getCostDay());
-            result.setCompensationAmountDay(lastReduceData.getCompensationAmountDay());
-            result.setViewCountDay(lastReduceData.getViewCountDay());
-            result.setThousandDisplayPriceDay(lastReduceData.getThousandDisplayPriceDay());
-            result.setValidClickCountDay(lastReduceData.getValidClickCountDay());
-            result.setCtrDay(lastReduceData.getCtrDay());
-            result.setCpcDay(lastReduceData.getCpcDay());
-            result.setValuableClickCountDay(lastReduceData.getValuableClickCountDay());
-            result.setValuableClickRateDay(lastReduceData.getValuableClickRateDay());
-            result.setValuableClickCostDay(lastReduceData.getValuableClickCostDay());
-            result.setConversionsCountDay(lastReduceData.getConversionsCountDay());
-            result.setConversionsCostDay(lastReduceData.getConversionsCostDay());
-            result.setConversionsRateDay(lastReduceData.getConversionsRateDay());
-            result.setDeepConversionsCountDay(lastReduceData.getDeepConversionsCountDay());
-            result.setDeepConversionsCostDay(lastReduceData.getDeepConversionsCostDay());
-            result.setDeepConversionsRateDay(lastReduceData.getDeepConversionsRateDay());
-            result.setOrderCountDay(lastReduceData.getOrderCountDay());
-            result.setFirstDayOrderCountDay(lastReduceData.getFirstDayOrderCountDay());
-            result.setWebOrderCostDay(lastReduceData.getWebOrderCostDay());
-            result.setOrderRateDay(lastReduceData.getOrderRateDay());
-            result.setOrderAmountDay(lastReduceData.getOrderAmountDay());
-            result.setFirstDayOrderAmountDay(lastReduceData.getFirstDayOrderAmountDay());
-            result.setOrderUnitPriceDay(lastReduceData.getOrderUnitPriceDay());
-            result.setOrderRoiDay(lastReduceData.getOrderRoiDay());
-            result.setSignInCountDay(lastReduceData.getSignInCountDay());
-            result.setScanFollowCountDay(lastReduceData.getScanFollowCountDay());
-            result.setWechatAppRegisterUvDay(lastReduceData.getWechatAppRegisterUvDay());
-            result.setWechatMinigameRegisterCostDay(lastReduceData.getWechatMinigameRegisterCostDay());
-            result.setWechatMinigameRegisterRateDay(lastReduceData.getWechatMinigameRegisterRateDay());
-            result.setWechatMinigameArpuDay(lastReduceData.getWechatMinigameArpuDay());
-            result.setWechatMinigameRetentionCountDay(lastReduceData.getWechatMinigameRetentionCountDay());
-            result.setWechatMinigameCheckoutCountDay(lastReduceData.getWechatMinigameCheckoutCountDay());
-            result.setWechatMinigameCheckoutAmountDay(lastReduceData.getWechatMinigameCheckoutAmountDay());
-            result.setOfficialAccountFollowCountDay(lastReduceData.getOfficialAccountFollowCountDay());
-            result.setOfficialAccountFollowRateDay(lastReduceData.getOfficialAccountFollowRateDay());
-            result.setOfficialAccountRegisterUserCountDay(lastReduceData.getOfficialAccountRegisterUserCountDay());
-            result.setOfficialAccountRegisterRateDay(lastReduceData.getOfficialAccountRegisterRateDay());
-            result.setOfficialAccountRegisterCostDay(lastReduceData.getOfficialAccountRegisterCostDay());
-            result.setOfficialAccountRegisterAmountDay(lastReduceData.getOfficialAccountRegisterAmountDay());
-            result.setOfficialAccountRegisterRoiDay(lastReduceData.getOfficialAccountRegisterRoiDay());
-            result.setOfficialAccountApplyCountDay(lastReduceData.getOfficialAccountApplyCountDay());
-            result.setOfficialAccountApplyUserCountDay(lastReduceData.getOfficialAccountApplyUserCountDay());
-            result.setOfficialAccountApplyRateDay(lastReduceData.getOfficialAccountApplyRateDay());
-            result.setOfficialAccountApplyCostDay(lastReduceData.getOfficialAccountApplyCostDay());
-            result.setOfficialAccountApplyAmountDay(lastReduceData.getOfficialAccountApplyAmountDay());
-            result.setOfficialAccountApplyRoiDay(lastReduceData.getOfficialAccountApplyRoiDay());
-            result.setOfficialAccountOrderCountDay(lastReduceData.getOfficialAccountOrderCountDay());
-            result.setOfficialAccountFirstDayOrderCountDay(lastReduceData.getOfficialAccountFirstDayOrderCountDay());
-            result.setOfficialAccountOrderUserCountDay(lastReduceData.getOfficialAccountOrderUserCountDay());
-            result.setOfficialAccountOrderRateDay(lastReduceData.getOfficialAccountOrderRateDay());
-            result.setOfficialAccountOrderCostDay(lastReduceData.getOfficialAccountOrderCostDay());
-            result.setOfficialAccountOrderAmountDay(lastReduceData.getOfficialAccountOrderAmountDay());
-            result.setOfficialAccountFirstDayOrderAmountDay(lastReduceData.getOfficialAccountFirstDayOrderAmountDay());
-            result.setOfficialAccountOrderRoiDay(lastReduceData.getOfficialAccountOrderRoiDay());
-            result.setOfficialAccountConsultCountDay(lastReduceData.getOfficialAccountConsultCountDay());
-            result.setOfficialAccountReaderCountDay(lastReduceData.getOfficialAccountReaderCountDay());
-            result.setOfficialAccountCreditApplyUserCountDay(lastReduceData.getOfficialAccountCreditApplyUserCountDay());
-            result.setOfficialAccountCreditUserCountDay(lastReduceData.getOfficialAccountCreditUserCountDay());
-            result.setForwardCountDay(lastReduceData.getForwardCountDay());
-            result.setForwardUserCountDay(lastReduceData.getForwardUserCountDay());
-            result.setNoInterestCountDay(lastReduceData.getNoInterestCountDay());
-        }
-        return result;
-    }
-}

+ 1 - 7
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdMinuteDWDProcess.java

@@ -31,8 +31,6 @@ import java.util.*;
 public class AdMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteODS, AdStatOfMinuteDWD, Long, TimeWindow> {
     private static final DateTimeFormatter formatForLastReduceKey = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH:mm");
 
-    private final OutputTag<AdStatOfHourDWD> adHourFromMinuteStreamTag;
-
     private SqlSessionFactory sqlSessionFactory;
     // 历史的天数据
     private ValueState<AdStatOfDayDWD> historyDayState;
@@ -43,8 +41,7 @@ public class AdMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteODS,
     // 前 5分钟聚合的数据
     private MapState<String, AdStatOfMinuteDWD> lastReduceState;
 
-    public AdMinuteDWDProcess(OutputTag<AdStatOfHourDWD> adHourFromMinuteStreamTag) {
-        this.adHourFromMinuteStreamTag = adHourFromMinuteStreamTag;
+    public AdMinuteDWDProcess() {
     }
 
     @Override
@@ -120,9 +117,6 @@ public class AdMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteODS,
         // 聚合当天的全部数据
         AdStatOfMinuteDWD newAdStat = AdStatOfMinuteDWD.reduce(beforeYesterdayDayDWD, yesterdayMinuteDWD, adDataOfMinuteODSList, statODS, lastReduceData, now);
         collector.collect(newAdStat);
-        if (beginDateTime.getMinute() == 55) {
-            context.output(adHourFromMinuteStreamTag, AdStatOfHourDWD.byMinuteDWD(newAdStat));
-        }
         lastReduceState.put(beginDateTime.format(formatForLastReduceKey), newAdStat);
         yesterdayMinuteState.put(statDay, newAdStat);