wcc 3 лет назад
Родитель
Сommit
08ffb265f2

+ 1 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdHourStreamJob.java

@@ -96,7 +96,7 @@ public class AdHourStreamJob {
         // 分钟流-计算
         SingleOutputStreamOperator<AdStatOfMinuteDWD> adMinuteDWDStream = adMinuteODSStream
                 // 打水印,允许数据延迟 5分钟,同时指定时间流
-                .assignTimestampsAndWatermarks(WatermarkStrategy.<AdDataOfMinuteODS>forBoundedOutOfOrderness(Duration.ofMinutes(5L))
+                .assignTimestampsAndWatermarks(WatermarkStrategy.<AdDataOfMinuteODS>forBoundedOutOfOrderness(Duration.ofDays(5L))
                         .withTimestampAssigner((SerializableTimestampAssigner<AdDataOfMinuteODS>) (adODS, l) -> adODS.getStatTime()))
                 .keyBy(AdDataOfMinuteODS::getAdId)
                 // 开一个 5分钟的滚动窗口

+ 28 - 32
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/Test.java

@@ -11,6 +11,9 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -25,6 +28,8 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.util.Collector;
 
 import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -49,7 +54,7 @@ public class Test {
 
         DataStreamSource<Pojo> source = env.addSource(new PojoSource());
         // 打水印,延迟 2秒,同时指定时间流
-        SingleOutputStreamOperator<Pojo> pojoStream = source.assignTimestampsAndWatermarks(WatermarkStrategy.<Pojo>forBoundedOutOfOrderness(Duration.ofHours(2))
+        SingleOutputStreamOperator<Pojo> pojoStream = source.assignTimestampsAndWatermarks(WatermarkStrategy.<Pojo>forBoundedOutOfOrderness(Duration.ofHours(3))
                 .withTimestampAssigner((SerializableTimestampAssigner<Pojo>) (pojo, l) -> pojo.getCreateTime())
         );
         pojoStream.keyBy(Pojo::getUserId)
@@ -66,18 +71,14 @@ public class Test {
                      */
                     @Override
                     public TriggerResult onElement(Pojo pojo, long time, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
-                        if (timeWindow.maxTimestamp() <= triggerContext.getCurrentWatermark()) {
-                            // 到了窗口的最大生命周期
-                            log.error("到了窗口的最大生命周期");
-                            return TriggerResult.FIRE_AND_PURGE;
-                        }
-                        log.error("trigger->onElement: {}, {}, {}, {}, [{} - {}]", JsonUtil.toString(pojo),
+                        /*log.error("trigger->onElement: {}, {}, {}, {}, [{} - {}]", JsonUtil.toString(pojo),
                                 DateUtil.formatLocalDateTime(DateUtil.milliToLocalDateTime(timeWindow.getStart())),
                                 DateUtil.formatLocalDateTime(DateUtil.milliToLocalDateTime(timeWindow.getEnd())),
                                 DateUtil.formatLocalDateTime(DateUtil.milliToLocalDateTime(triggerContext.getCurrentWatermark())),
                                 DateUtil.formatLocalDateTime(DateUtil.milliToLocalDateTime(timeWindow.maxTimestamp())),
                                 DateUtil.formatLocalDateTime(DateUtil.milliToLocalDateTime(triggerContext.getCurrentProcessingTime()))
-                        );
+                        );*/
+                        log.error("收到数据:{},eventTime:{}", DateUtil.milliToLocalDateTime(pojo.getCreateTime()), DateUtil.milliToLocalDateTime(time));
                         return TriggerResult.FIRE;
                     }
 
@@ -122,17 +123,21 @@ public class Test {
                     public void clear(TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
                         log.error("trigger->clear");
                     }
-                })
-                .process(new ProcessWindowFunction<Pojo, Pojo, Integer, TimeWindow>() {
+                }).process(new ProcessWindowFunction<Pojo, Pojo, Integer, TimeWindow>() {
+
+                    @Override
+                    public void open(Configuration conf) {
+                    }
+
                     @Override
                     public void process(Integer integer,
                                         ProcessWindowFunction<Pojo, Pojo, Integer, TimeWindow>.Context context, Iterable<Pojo> iterable,
                                         Collector<Pojo> collector) throws Exception {
-                        for(Pojo pojo : iterable) {
+                        for (Pojo pojo : iterable) {
                             collector.collect(pojo);
                         }
                     }
-                }).print();
+                });//.print();
                 /*.aggregate(new AggregateFunction<Pojo, Tuple5<Integer, Long, Long, Integer, List<Long>>, String>() {
 
                     @Override
@@ -199,9 +204,6 @@ public class Test {
         // 2021-01-01 00:00:00
         private static final long BEGIN = 1609430400000L;
 
-        // 对象锁,防止MaxCompute的 Tunnel对象多次初始化
-        private static final Object DUMMY_LOCK = new Object();
-
         private static AtomicLong index1;
         private static AtomicLong index2;
         private boolean isRun = false;
@@ -210,22 +212,16 @@ public class Test {
         @Override
         public void open(Configuration configuration) {
             isRun = true;
-            if (index1 == null) {
-                synchronized (DUMMY_LOCK) {
-                    if (index1 == null) {
-                        index1 = new AtomicLong(0);
-                        index2 = new AtomicLong(0);
-                        threadPool = new ThreadPoolExecutor(
-                                60,
-                                60,
-                                0L,
-                                TimeUnit.MILLISECONDS,
-                                new LinkedBlockingQueue<>(),
-                                new ThreadFactoryBuilder()
-                                        .setNameFormat("maxcompute-writer-%d").build());
-                    }
-                }
-            }
+            index1 = new AtomicLong(0);
+            index2 = new AtomicLong(0);
+            threadPool = new ThreadPoolExecutor(
+                    60,
+                    60,
+                    0L,
+                    TimeUnit.MILLISECONDS,
+                    new LinkedBlockingQueue<>(),
+                    new ThreadFactoryBuilder()
+                            .setNameFormat("maxcompute-writer-%d").build());
         }
 
         @Override
@@ -242,7 +238,7 @@ public class Test {
                         // 模拟数据延迟,每天 24点的数据延迟 25秒
                         threadPool.execute(() -> {
                             try {
-                                Thread.sleep(25);
+                                Thread.sleep(2000);
                             } catch (InterruptedException e) {
                                 e.printStackTrace();
                             }

+ 170 - 191
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/pojo/entity/AdStatOfHourDWD.java

@@ -12,6 +12,7 @@ import org.springframework.beans.BeanUtils;
 
 import java.time.LocalDateTime;
 import java.util.Date;
+import java.util.List;
 
 /**
  * 广告维度的小时统计数据
@@ -1286,201 +1287,179 @@ public class AdStatOfHourDWD {
         return adStatOfHour;
     }
 
-    public static AdStatOfHourDWD reduce(AdStatOfDayDWD yesterdayDWD, AdStatOfHourDWD value1, AdDataOfHourODS value2, long createTime) {
+    public static AdStatOfHourDWD reduce(AdStatOfDayDWD yesterdayDWD, List<AdDataOfHourODS> hourODSList, AdDataOfHourODS hourODS, long createTime) {
         AdStatOfHourDWD result = new AdStatOfHourDWD();
-        BeanUtils.copyProperties(value2, result);
+        BeanUtils.copyProperties(hourODS, result);
         result.setCreateTime(new Date(createTime));
 
-        result.setCostDeviationRateHour(value2.getCostDeviationRate());
-        result.setCostHour(value2.getCost());
-        result.setCompensationAmountHour(value2.getCompensationAmount());
-        result.setViewCountHour(value2.getViewCount());
-        result.setThousandDisplayPriceHour(value2.getThousandDisplayPrice());
-        result.setAvgViewPerUserHour(value2.getAvgViewPerUser());
-        result.setValidClickCountHour(value2.getValidClickCount());
-        result.setCtrHour(value2.getCtr());
-        result.setCpcHour(value2.getCpc());
-        result.setValuableClickCountHour(value2.getValuableClickCount());
-        result.setValuableClickRateHour(value2.getValuableClickRate());
-        result.setValuableClickCostHour(value2.getValuableClickCost());
-        result.setConversionsCountHour(value2.getConversionsCount());
-        result.setConversionsCostHour(value2.getConversionsCost());
-        result.setConversionsRateHour(value2.getConversionsRate());
-        result.setDeepConversionsCountHour(value2.getDeepConversionsCount());
-        result.setDeepConversionsCostHour(value2.getDeepConversionsCost());
-        result.setDeepConversionsRateHour(value2.getDeepConversionsRate());
-        result.setOrderCountHour(value2.getOrderCount());
-        result.setFirstDayOrderCountHour(value2.getFirstDayOrderCount());
-        result.setWebOrderCostHour(value2.getWebOrderCost());
-        result.setOrderRateHour(value2.getOrderRate());
-        result.setOrderAmountHour(value2.getOrderAmount());
-        result.setFirstDayOrderAmountHour(value2.getFirstDayOrderAmount());
-        result.setOrderUnitPriceHour(value2.getOrderUnitPrice());
-        result.setOrderRoiHour(value2.getOrderRoi());
-        result.setSignInCountHour(value2.getSignInCount());
-        result.setScanFollowCountHour(value2.getScanFollowCount());
-        result.setWechatAppRegisterUvHour(value2.getWechatAppRegisterUv());
-        result.setWechatMinigameRegisterCostHour(value2.getWechatMinigameRegisterCost());
-        result.setWechatMinigameRegisterRateHour(value2.getWechatMinigameRegisterRate());
-        result.setWechatMinigameArpuHour(value2.getWechatMinigameArpu());
-        result.setWechatMinigameRetentionCountHour(value2.getWechatMinigameRetentionCount());
-        result.setWechatMinigameCheckoutCountHour(value2.getWechatMinigameCheckoutCount());
-        result.setWechatMinigameCheckoutAmountHour(value2.getWechatMinigameCheckoutAmount());
-        result.setOfficialAccountFollowCountHour(value2.getOfficialAccountFollowCount());
-        result.setOfficialAccountFollowRateHour(value2.getOfficialAccountFollowRate());
-        result.setOfficialAccountRegisterUserCountHour(value2.getOfficialAccountRegisterUserCount());
-        result.setOfficialAccountRegisterRateHour(value2.getOfficialAccountRegisterRate());
-        result.setOfficialAccountRegisterCostHour(value2.getOfficialAccountRegisterCost());
-        result.setOfficialAccountRegisterAmountHour(value2.getOfficialAccountRegisterAmount());
-        result.setOfficialAccountRegisterRoiHour(value2.getOfficialAccountRegisterRoi());
-        result.setOfficialAccountApplyCountHour(value2.getOfficialAccountApplyCount());
-        result.setOfficialAccountApplyUserCountHour(value2.getOfficialAccountApplyUserCount());
-        result.setOfficialAccountApplyRateHour(value2.getOfficialAccountApplyRate());
-        result.setOfficialAccountApplyCostHour(value2.getOfficialAccountApplyCost());
-        result.setOfficialAccountApplyAmountHour(value2.getOfficialAccountApplyAmount());
-        result.setOfficialAccountApplyRoiHour(value2.getOfficialAccountApplyRoi());
-        result.setOfficialAccountOrderCountHour(value2.getOfficialAccountOrderCount());
-        result.setOfficialAccountFirstDayOrderCountHour(value2.getOfficialAccountFirstDayOrderCount());
-        result.setOfficialAccountOrderUserCountHour(value2.getOfficialAccountOrderUserCount());
-        result.setOfficialAccountOrderRateHour(value2.getOfficialAccountOrderRate());
-        result.setOfficialAccountOrderCostHour(value2.getOfficialAccountOrderCost());
-        result.setOfficialAccountOrderAmountHour(value2.getOfficialAccountOrderAmount());
-        result.setOfficialAccountFirstDayOrderAmountHour(value2.getOfficialAccountFirstDayOrderAmount());
-        result.setOfficialAccountOrderRoiHour(value2.getOfficialAccountOrderRoi());
-        result.setOfficialAccountConsultCountHour(value2.getOfficialAccountConsultCount());
-        result.setOfficialAccountReaderCountHour(value2.getOfficialAccountReaderCount());
-        result.setOfficialAccountCreditApplyUserCountHour(value2.getOfficialAccountCreditApplyUserCount());
-        result.setOfficialAccountCreditUserCountHour(value2.getOfficialAccountCreditUserCount());
-        result.setForwardCountHour(value2.getForwardCount());
-        result.setForwardUserCountHour(value2.getForwardUserCount());
-        result.setNoInterestCountHour(value2.getNoInterestCount());
-        if (value1 == null) {
-            result.setCostDeviationRateDay(value2.getCostDeviationRate());
-            result.setCostDay(value2.getCost());
-            result.setCompensationAmountDay(value2.getCompensationAmount());
-            result.setViewCountDay(value2.getViewCount());
-            result.setThousandDisplayPriceDay(value2.getThousandDisplayPrice());
-            result.setValidClickCountDay(value2.getValidClickCount());
-            result.setCtrDay(value2.getCtr());
-            result.setCpcDay(value2.getCpc());
-            result.setValuableClickCountDay(value2.getValuableClickCount());
-            result.setValuableClickRateDay(value2.getValuableClickRate());
-            result.setValuableClickCostDay(value2.getValuableClickCost());
-            result.setConversionsCountDay(value2.getConversionsCount());
-            result.setConversionsCostDay(value2.getConversionsCost());
-            result.setConversionsRateDay(value2.getConversionsRate());
-            result.setDeepConversionsCountDay(value2.getDeepConversionsCount());
-            result.setDeepConversionsCostDay(value2.getDeepConversionsCost());
-            result.setDeepConversionsRateDay(value2.getDeepConversionsRate());
-            result.setOrderCountDay(value2.getOrderCount());
-            result.setFirstDayOrderCountDay(value2.getFirstDayOrderCount());
-            result.setWebOrderCostDay(value2.getWebOrderCost());
-            result.setOrderRateDay(value2.getOrderRate());
-            result.setOrderAmountDay(value2.getOrderAmount());
-            result.setFirstDayOrderAmountDay(value2.getFirstDayOrderAmount());
-            result.setOrderUnitPriceDay(value2.getOrderUnitPrice());
-            result.setOrderRoiDay(value2.getOrderRoi());
-            result.setSignInCountDay(value2.getSignInCount());
-            result.setScanFollowCountDay(value2.getScanFollowCount());
-            result.setWechatAppRegisterUvDay(value2.getWechatAppRegisterUv());
-            result.setWechatMinigameRegisterCostDay(value2.getWechatMinigameRegisterCost());
-            result.setWechatMinigameRegisterRateDay(value2.getWechatMinigameRegisterRate());
-            result.setWechatMinigameArpuDay(value2.getWechatMinigameArpu());
-            result.setWechatMinigameRetentionCountDay(value2.getWechatMinigameRetentionCount());
-            result.setWechatMinigameCheckoutCountDay(value2.getWechatMinigameCheckoutCount());
-            result.setWechatMinigameCheckoutAmountDay(value2.getWechatMinigameCheckoutAmount());
-            result.setOfficialAccountFollowCountDay(value2.getOfficialAccountFollowCount());
-            result.setOfficialAccountFollowRateDay(value2.getOfficialAccountFollowRate());
-            result.setOfficialAccountRegisterUserCountDay(value2.getOfficialAccountRegisterUserCount());
-            result.setOfficialAccountRegisterRateDay(value2.getOfficialAccountRegisterRate());
-            result.setOfficialAccountRegisterCostDay(value2.getOfficialAccountRegisterCost());
-            result.setOfficialAccountRegisterAmountDay(value2.getOfficialAccountRegisterAmount());
-            result.setOfficialAccountRegisterRoiDay(value2.getOfficialAccountRegisterRoi());
-            result.setOfficialAccountApplyCountDay(value2.getOfficialAccountApplyCount());
-            result.setOfficialAccountApplyUserCountDay(value2.getOfficialAccountApplyUserCount());
-            result.setOfficialAccountApplyRateDay(value2.getOfficialAccountApplyRate());
-            result.setOfficialAccountApplyCostDay(value2.getOfficialAccountApplyCost());
-            result.setOfficialAccountApplyAmountDay(value2.getOfficialAccountApplyAmount());
-            result.setOfficialAccountApplyRoiDay(value2.getOfficialAccountApplyRoi());
-            result.setOfficialAccountOrderCountDay(value2.getOfficialAccountOrderCount());
-            result.setOfficialAccountFirstDayOrderCountDay(value2.getOfficialAccountFirstDayOrderCount());
-            result.setOfficialAccountOrderUserCountDay(value2.getOfficialAccountOrderUserCount());
-            result.setOfficialAccountOrderRateDay(value2.getOfficialAccountOrderRate());
-            result.setOfficialAccountOrderCostDay(value2.getOfficialAccountOrderCost());
-            result.setOfficialAccountOrderAmountDay(value2.getOfficialAccountOrderAmount());
-            result.setOfficialAccountFirstDayOrderAmountDay(value2.getOfficialAccountFirstDayOrderAmount());
-            result.setOfficialAccountOrderRoiDay(value2.getOfficialAccountOrderRoi());
-            result.setOfficialAccountConsultCountDay(value2.getOfficialAccountConsultCount());
-            result.setOfficialAccountReaderCountDay(value2.getOfficialAccountReaderCount());
-            result.setOfficialAccountCreditApplyUserCountDay(value2.getOfficialAccountCreditApplyUserCount());
-            result.setOfficialAccountCreditUserCountDay(value2.getOfficialAccountCreditUserCount());
-            result.setForwardCountDay(value2.getForwardCount());
-            result.setForwardUserCountDay(value2.getForwardUserCount());
-            result.setNoInterestCountDay(value2.getNoInterestCount());
-        } else {
-            result.setCostDeviationRateDay(value1.getCostDeviationRateDay() + value2.getCostDeviationRate());
-            result.setCostDay(value1.getCostDay() + value2.getCost());
-            result.setCompensationAmountDay(value1.getCompensationAmountDay() + value2.getCompensationAmount());
-            result.setViewCountDay(value1.getViewCountDay() + value2.getViewCount());
-            result.setThousandDisplayPriceDay(result.getViewCountDay() == 0 ? 0 : (result.getCostDay() / result.getViewCountDay() * 1000));
-            result.setValidClickCountDay(value1.getValidClickCountDay() + value2.getValidClickCount());
-            result.setCtrDay(result.getViewCountDay() == 0 ? 0.0 : result.getValidClickCountDay() / result.getViewCountDay());
-            result.setCpcDay(result.getValidClickCountDay() == 0 ? 0 : result.getCostDay() / result.getValidClickCountDay());
-            result.setValuableClickCountDay(value1.getValuableClickCountDay() + value2.getValuableClickCount());
-            result.setValuableClickRateDay(result.getViewCountDay() == 0 ? 0.0 : result.getValuableClickCountDay() / result.getViewCountDay());
-            result.setValuableClickCostDay(result.getValuableClickCountDay() == 0 ? 0 : result.getCostDay() / result.getValuableClickCountDay());
-            result.setConversionsCountDay(value1.getConversionsCountDay() + value2.getConversionsCount());
-            result.setConversionsCostDay(result.getConversionsCountDay() == 0 ? 0 : result.getCostDay() / result.getConversionsCountDay());
-            result.setConversionsRateDay(result.getValidClickCountDay() == 0 ? 0.0 : result.getConversionsCountDay() / result.getValidClickCountDay());
-            result.setDeepConversionsCountDay(value1.getDeepConversionsCountDay() + value2.getDeepConversionsCount());
-            result.setDeepConversionsCostDay(result.getDeepConversionsCountDay() == 0 ? 0 : result.getCostDay() / result.getDeepConversionsCountDay());
-            result.setDeepConversionsRateDay(result.getValuableClickCountDay() == 0 ? 0.0 : result.getDeepConversionsCountDay() / result.getValuableClickCountDay());
-            result.setOrderCountDay(value1.getOrderCountDay() + value2.getOrderCount());
-            result.setFirstDayOrderCountDay(value1.getFirstDayOrderCountDay() + value2.getFirstDayOrderCount());
-            result.setWebOrderCostDay(result.getOrderCountDay() == 0 ? 0 : result.getCostDay() / result.getOrderCountDay());
-            result.setOrderRateDay(result.getValidClickCountDay() == 0 ? 0.0 : result.getOrderCountDay() / result.getValidClickCountDay());
-            result.setOrderAmountDay(value1.getOrderAmountDay() + value2.getOrderAmount());
-            result.setFirstDayOrderAmountDay(value1.getFirstDayOrderAmountDay() + value2.getFirstDayOrderAmount());
-            result.setOrderUnitPriceDay(result.getOrderCountDay() == 0 ? 0 : result.getOrderAmountDay() / result.getOrderCountDay());
-            result.setOrderRoiDay(result.getCostDay() == 0 ? 0.0 : result.getOrderAmountDay() / result.getCostDay());
-            result.setSignInCountDay(value1.getSignInCountDay() + value2.getSignInCount());
-            result.setScanFollowCountDay(value1.getScanFollowCountDay() + value2.getScanFollowCount());
-            result.setWechatAppRegisterUvDay(value1.getWechatAppRegisterUvDay() + value2.getWechatAppRegisterUv());
-            result.setWechatMinigameRegisterCostDay(result.getWechatAppRegisterUvDay() == 0 ? 0 : result.getCostDay() / result.getWechatAppRegisterUvDay());
-            result.setWechatMinigameRegisterRateDay(result.getValidClickCountDay() == 0 ? 0.0 : result.getWechatAppRegisterUvDay() / result.getValidClickCountDay());
-            result.setWechatMinigameArpuDay(result.getWechatAppRegisterUvDay() == 0 ? 0.0 : result.getOrderAmountDay() / result.getWechatAppRegisterUvDay());
-            result.setWechatMinigameRetentionCountDay(value1.getWechatMinigameRetentionCountDay() + value2.getWechatMinigameRetentionCount());
-            result.setWechatMinigameCheckoutCountDay(value1.getWechatMinigameCheckoutCountDay() + value2.getWechatMinigameCheckoutCount());
-            result.setWechatMinigameCheckoutAmountDay(value1.getWechatMinigameCheckoutAmountDay() + value2.getWechatMinigameCheckoutAmount());
-            result.setOfficialAccountFollowCountDay(value1.getOfficialAccountFollowCountDay() + value2.getOfficialAccountFollowCount());
-            result.setOfficialAccountFollowRateDay(result.getValidClickCountDay() == 0 ? 0.0 : result.getOfficialAccountFollowCountDay() / result.getValidClickCountDay());
-            result.setOfficialAccountRegisterUserCountDay(value1.getOfficialAccountRegisterUserCountDay() + value2.getOfficialAccountRegisterUserCount());
-            result.setOfficialAccountRegisterRateDay(result.getOfficialAccountFollowCountDay() == 0 ? 0.0 : result.getOfficialAccountRegisterUserCountDay() / result.getOfficialAccountFollowCountDay());
-            result.setOfficialAccountRegisterCostDay(result.getOfficialAccountRegisterUserCountDay() == 0 ? 0 : result.getCostDay() / result.getOfficialAccountRegisterUserCountDay());
-            result.setOfficialAccountRegisterAmountDay(value1.getOfficialAccountRegisterAmountDay() + value2.getOfficialAccountRegisterAmount());
-            result.setOfficialAccountRegisterRoiDay(result.getCostDay() == 0 ? 0 : result.getOfficialAccountRegisterAmountDay() / result.getCostDay());
-            result.setOfficialAccountApplyCountDay(value1.getOfficialAccountApplyCountDay() + value2.getOfficialAccountApplyCount());
-            result.setOfficialAccountApplyUserCountDay(value1.getOfficialAccountApplyUserCountDay() + value2.getOfficialAccountApplyUserCount());
-            result.setOfficialAccountApplyRateDay(result.getOfficialAccountFollowCountDay() == 0 ? 0.0 : result.getOfficialAccountApplyUserCountDay() / result.getOfficialAccountFollowCountDay());
-            result.setOfficialAccountApplyCostDay(result.getOfficialAccountApplyUserCountDay() == 0 ? 0 : result.getCostDay() / result.getOfficialAccountApplyUserCountDay());
-            result.setOfficialAccountApplyAmountDay(value1.getOfficialAccountApplyAmountDay() + value2.getOfficialAccountApplyAmount());
-            result.setOfficialAccountApplyRoiDay(result.getCostDay() == 0 ? 0 : result.getOfficialAccountApplyAmountDay() / result.getCostDay());
-            result.setOfficialAccountOrderCountDay(value1.getOfficialAccountOrderCountDay() + value2.getOfficialAccountOrderCount());
-            result.setOfficialAccountFirstDayOrderCountDay(value1.getOfficialAccountFirstDayOrderCountDay() + value2.getOfficialAccountFirstDayOrderCount());
-            result.setOfficialAccountOrderUserCountDay(value1.getOfficialAccountOrderUserCountDay() + value2.getOfficialAccountOrderUserCount());
-            result.setOfficialAccountOrderRateDay(result.getOfficialAccountFollowCountDay() == 0 ? 0.0 : result.getOfficialAccountOrderUserCountDay() / result.getOfficialAccountFollowCountDay());
-            result.setOfficialAccountOrderCostDay(result.getOfficialAccountOrderCountDay() == 0 ? 0 : result.getCostDay() / result.getOfficialAccountOrderCountDay());
-            result.setOfficialAccountOrderAmountDay(value1.getOfficialAccountOrderAmountDay() + value2.getOfficialAccountOrderAmount());
-            result.setOfficialAccountFirstDayOrderAmountDay(value1.getOfficialAccountFirstDayOrderAmountDay() + value2.getOfficialAccountFirstDayOrderAmount());
-            result.setOfficialAccountOrderRoiDay(result.getCostDay() == 0 ? 0 : result.getOfficialAccountOrderAmountDay() / result.getCostDay());
-            result.setOfficialAccountConsultCountDay(value1.getOfficialAccountConsultCountDay() + value2.getOfficialAccountConsultCount());
-            result.setOfficialAccountReaderCountDay(value1.getOfficialAccountReaderCountDay() + value2.getOfficialAccountReaderCount());
-            result.setOfficialAccountCreditApplyUserCountDay(value1.getOfficialAccountCreditApplyUserCountDay() + value2.getOfficialAccountCreditApplyUserCount());
-            result.setOfficialAccountCreditUserCountDay(value1.getOfficialAccountCreditUserCountDay() + value2.getOfficialAccountCreditUserCount());
-            result.setForwardCountDay(value1.getForwardCountDay() + value2.getForwardCount());
-            result.setForwardUserCountDay(value1.getForwardUserCountDay() + value2.getForwardUserCount());
-            result.setNoInterestCountDay(value1.getNoInterestCountDay() + value2.getNoInterestCount());
+        // 小时数据
+        result.setCostDeviationRateHour(hourODS.getCostDeviationRate());
+        result.setCostHour(hourODS.getCost());
+        result.setCompensationAmountHour(hourODS.getCompensationAmount());
+        result.setViewCountHour(hourODS.getViewCount());
+        result.setThousandDisplayPriceHour(hourODS.getThousandDisplayPrice());
+        result.setAvgViewPerUserHour(hourODS.getAvgViewPerUser());
+        result.setValidClickCountHour(hourODS.getValidClickCount());
+        result.setCtrHour(hourODS.getCtr());
+        result.setCpcHour(hourODS.getCpc());
+        result.setValuableClickCountHour(hourODS.getValuableClickCount());
+        result.setValuableClickRateHour(hourODS.getValuableClickRate());
+        result.setValuableClickCostHour(hourODS.getValuableClickCost());
+        result.setConversionsCountHour(hourODS.getConversionsCount());
+        result.setConversionsCostHour(hourODS.getConversionsCost());
+        result.setConversionsRateHour(hourODS.getConversionsRate());
+        result.setDeepConversionsCountHour(hourODS.getDeepConversionsCount());
+        result.setDeepConversionsCostHour(hourODS.getDeepConversionsCost());
+        result.setDeepConversionsRateHour(hourODS.getDeepConversionsRate());
+        result.setOrderCountHour(hourODS.getOrderCount());
+        result.setFirstDayOrderCountHour(hourODS.getFirstDayOrderCount());
+        result.setWebOrderCostHour(hourODS.getWebOrderCost());
+        result.setOrderRateHour(hourODS.getOrderRate());
+        result.setOrderAmountHour(hourODS.getOrderAmount());
+        result.setFirstDayOrderAmountHour(hourODS.getFirstDayOrderAmount());
+        result.setOrderUnitPriceHour(hourODS.getOrderUnitPrice());
+        result.setOrderRoiHour(hourODS.getOrderRoi());
+        result.setSignInCountHour(hourODS.getSignInCount());
+        result.setScanFollowCountHour(hourODS.getScanFollowCount());
+        result.setWechatAppRegisterUvHour(hourODS.getWechatAppRegisterUv());
+        result.setWechatMinigameRegisterCostHour(hourODS.getWechatMinigameRegisterCost());
+        result.setWechatMinigameRegisterRateHour(hourODS.getWechatMinigameRegisterRate());
+        result.setWechatMinigameArpuHour(hourODS.getWechatMinigameArpu());
+        result.setWechatMinigameRetentionCountHour(hourODS.getWechatMinigameRetentionCount());
+        result.setWechatMinigameCheckoutCountHour(hourODS.getWechatMinigameCheckoutCount());
+        result.setWechatMinigameCheckoutAmountHour(hourODS.getWechatMinigameCheckoutAmount());
+        result.setOfficialAccountFollowCountHour(hourODS.getOfficialAccountFollowCount());
+        result.setOfficialAccountFollowRateHour(hourODS.getOfficialAccountFollowRate());
+        result.setOfficialAccountRegisterUserCountHour(hourODS.getOfficialAccountRegisterUserCount());
+        result.setOfficialAccountRegisterRateHour(hourODS.getOfficialAccountRegisterRate());
+        result.setOfficialAccountRegisterCostHour(hourODS.getOfficialAccountRegisterCost());
+        result.setOfficialAccountRegisterAmountHour(hourODS.getOfficialAccountRegisterAmount());
+        result.setOfficialAccountRegisterRoiHour(hourODS.getOfficialAccountRegisterRoi());
+        result.setOfficialAccountApplyCountHour(hourODS.getOfficialAccountApplyCount());
+        result.setOfficialAccountApplyUserCountHour(hourODS.getOfficialAccountApplyUserCount());
+        result.setOfficialAccountApplyRateHour(hourODS.getOfficialAccountApplyRate());
+        result.setOfficialAccountApplyCostHour(hourODS.getOfficialAccountApplyCost());
+        result.setOfficialAccountApplyAmountHour(hourODS.getOfficialAccountApplyAmount());
+        result.setOfficialAccountApplyRoiHour(hourODS.getOfficialAccountApplyRoi());
+        result.setOfficialAccountOrderCountHour(hourODS.getOfficialAccountOrderCount());
+        result.setOfficialAccountFirstDayOrderCountHour(hourODS.getOfficialAccountFirstDayOrderCount());
+        result.setOfficialAccountOrderUserCountHour(hourODS.getOfficialAccountOrderUserCount());
+        result.setOfficialAccountOrderRateHour(hourODS.getOfficialAccountOrderRate());
+        result.setOfficialAccountOrderCostHour(hourODS.getOfficialAccountOrderCost());
+        result.setOfficialAccountOrderAmountHour(hourODS.getOfficialAccountOrderAmount());
+        result.setOfficialAccountFirstDayOrderAmountHour(hourODS.getOfficialAccountFirstDayOrderAmount());
+        result.setOfficialAccountOrderRoiHour(hourODS.getOfficialAccountOrderRoi());
+        result.setOfficialAccountConsultCountHour(hourODS.getOfficialAccountConsultCount());
+        result.setOfficialAccountReaderCountHour(hourODS.getOfficialAccountReaderCount());
+        result.setOfficialAccountCreditApplyUserCountHour(hourODS.getOfficialAccountCreditApplyUserCount());
+        result.setOfficialAccountCreditUserCountHour(hourODS.getOfficialAccountCreditUserCount());
+        result.setForwardCountHour(hourODS.getForwardCount());
+        result.setForwardUserCountHour(hourODS.getForwardUserCount());
+        result.setNoInterestCountHour(hourODS.getNoInterestCount());
+
+        // 聚合天数据
+        result.setCostDeviationRateDay(0.0);
+        result.setCostDay(0L);
+        result.setCompensationAmountDay(0L);
+        result.setViewCountDay(0L);
+        result.setValidClickCountDay(0L);
+        result.setValuableClickCountDay(0L);
+        result.setConversionsCountDay(0L);
+        result.setDeepConversionsCountDay(0L);
+        result.setOrderCountDay(0L);
+        result.setFirstDayOrderCountDay(0L);
+        result.setOrderAmountDay(0L);
+        result.setFirstDayOrderAmountDay(0L);
+        result.setSignInCountDay(0L);
+        result.setScanFollowCountDay(0L);
+        result.setWechatAppRegisterUvDay(0L);
+        result.setWechatMinigameRetentionCountDay(0L);
+        result.setWechatMinigameCheckoutCountDay(0L);
+        result.setWechatMinigameCheckoutAmountDay(0L);
+        result.setOfficialAccountFollowCountDay(0L);
+        result.setOfficialAccountRegisterUserCountDay(0L);
+        result.setOfficialAccountRegisterAmountDay(0L);
+        result.setOfficialAccountApplyCountDay(0L);
+        result.setOfficialAccountApplyUserCountDay(0L);
+        result.setOfficialAccountApplyAmountDay(0L);
+        result.setOfficialAccountOrderCountDay(0L);
+        result.setOfficialAccountFirstDayOrderCountDay(0L);
+        result.setOfficialAccountOrderUserCountDay(0L);
+        result.setOfficialAccountOrderAmountDay(0L);
+        result.setOfficialAccountFirstDayOrderAmountDay(0L);
+        result.setOfficialAccountConsultCountDay(0L);
+        result.setOfficialAccountReaderCountDay(0L);
+        result.setOfficialAccountCreditApplyUserCountDay(0L);
+        result.setOfficialAccountCreditUserCountDay(0L);
+        result.setForwardCountDay(0L);
+        result.setForwardUserCountDay(0L);
+        result.setNoInterestCountDay(0L);
+        for (AdDataOfHourODS ods : hourODSList) {
+            result.setCostDeviationRateDay(result.getCostDeviationRateDay() + ods.getCostDeviationRate());
+            result.setCostDay(result.getCostDay() + ods.getCost());
+            result.setCompensationAmountDay(result.getCompensationAmountDay() + ods.getCompensationAmount());
+            result.setViewCountDay(result.getViewCountDay() + ods.getViewCount());
+            result.setValidClickCountDay(result.getValidClickCountDay() + ods.getValidClickCount());
+            result.setValuableClickCountDay(result.getValuableClickCountDay() + ods.getValuableClickCount());
+            result.setConversionsCountDay(result.getConversionsCountDay() + ods.getConversionsCount());
+            result.setDeepConversionsCountDay(result.getDeepConversionsCountDay() + ods.getDeepConversionsCount());
+            result.setOrderCountDay(result.getOrderCountDay() + ods.getOrderCount());
+            result.setFirstDayOrderCountDay(result.getFirstDayOrderCountDay() + ods.getFirstDayOrderCount());
+            result.setOrderAmountDay(result.getOrderAmountDay() + ods.getOrderAmount());
+            result.setFirstDayOrderAmountDay(result.getFirstDayOrderAmountDay() + ods.getFirstDayOrderAmount());
+            result.setSignInCountDay(result.getSignInCountDay() + ods.getSignInCount());
+            result.setScanFollowCountDay(result.getScanFollowCountDay() + ods.getScanFollowCount());
+            result.setWechatAppRegisterUvDay(result.getWechatAppRegisterUvDay() + ods.getWechatAppRegisterUv());
+            result.setWechatMinigameRetentionCountDay(result.getWechatMinigameRetentionCountDay() + ods.getWechatMinigameRetentionCount());
+            result.setWechatMinigameCheckoutCountDay(result.getWechatMinigameCheckoutCountDay() + ods.getWechatMinigameCheckoutCount());
+            result.setWechatMinigameCheckoutAmountDay(result.getWechatMinigameCheckoutAmountDay() + ods.getWechatMinigameCheckoutAmount());
+            result.setOfficialAccountFollowCountDay(result.getOfficialAccountFollowCountDay() + ods.getOfficialAccountFollowCount());
+            result.setOfficialAccountRegisterUserCountDay(result.getOfficialAccountRegisterUserCountDay() + ods.getOfficialAccountRegisterUserCount());
+            result.setOfficialAccountRegisterAmountDay(result.getOfficialAccountRegisterAmountDay() + ods.getOfficialAccountRegisterAmount());
+            result.setOfficialAccountApplyCountDay(result.getOfficialAccountApplyCountDay() + ods.getOfficialAccountApplyCount());
+            result.setOfficialAccountApplyUserCountDay(result.getOfficialAccountApplyUserCountDay() + ods.getOfficialAccountApplyUserCount());
+            result.setOfficialAccountApplyAmountDay(result.getOfficialAccountApplyAmountDay() + ods.getOfficialAccountApplyAmount());
+            result.setOfficialAccountOrderCountDay(result.getOfficialAccountOrderCountDay() + ods.getOfficialAccountOrderCount());
+            result.setOfficialAccountFirstDayOrderCountDay(result.getOfficialAccountFirstDayOrderCountDay() + ods.getOfficialAccountFirstDayOrderCount());
+            result.setOfficialAccountOrderUserCountDay(result.getOfficialAccountOrderUserCountDay() + ods.getOfficialAccountOrderUserCount());
+            result.setOfficialAccountOrderAmountDay(result.getOfficialAccountOrderAmountDay() + ods.getOfficialAccountOrderAmount());
+            result.setOfficialAccountFirstDayOrderAmountDay(result.getOfficialAccountFirstDayOrderAmountDay() + ods.getOfficialAccountFirstDayOrderAmount());
+            result.setOfficialAccountConsultCountDay(result.getOfficialAccountConsultCountDay() + ods.getOfficialAccountConsultCount());
+            result.setOfficialAccountReaderCountDay(result.getOfficialAccountReaderCountDay() + ods.getOfficialAccountReaderCount());
+            result.setOfficialAccountCreditApplyUserCountDay(result.getOfficialAccountCreditApplyUserCountDay() + ods.getOfficialAccountCreditApplyUserCount());
+            result.setOfficialAccountCreditUserCountDay(result.getOfficialAccountCreditUserCountDay() + ods.getOfficialAccountCreditUserCount());
+            result.setForwardCountDay(result.getForwardCountDay() + ods.getForwardCount());
+            result.setForwardUserCountDay(result.getForwardUserCountDay() + ods.getForwardUserCount());
+            result.setNoInterestCountDay(result.getNoInterestCountDay() + ods.getNoInterestCount());
         }
+        result.setThousandDisplayPriceDay(result.getViewCountDay() == 0 ? 0 : (result.getCostDay() / result.getViewCountDay() * 1000));
+        result.setCtrDay(result.getViewCountDay() == 0 ? 0.0 : result.getValidClickCountDay() / result.getViewCountDay());
+        result.setCpcDay(result.getValidClickCountDay() == 0 ? 0 : result.getCostDay() / result.getValidClickCountDay());
+        result.setValuableClickRateDay(result.getViewCountDay() == 0 ? 0.0 : result.getValuableClickCountDay() / result.getViewCountDay());
+        result.setValuableClickCostDay(result.getValuableClickCountDay() == 0 ? 0 : result.getCostDay() / result.getValuableClickCountDay());
+        result.setConversionsCostDay(result.getConversionsCountDay() == 0 ? 0 : result.getCostDay() / result.getConversionsCountDay());
+        result.setConversionsRateDay(result.getValidClickCountDay() == 0 ? 0.0 : result.getConversionsCountDay() / result.getValidClickCountDay());
+        result.setDeepConversionsCostDay(result.getDeepConversionsCountDay() == 0 ? 0 : result.getCostDay() / result.getDeepConversionsCountDay());
+        result.setDeepConversionsRateDay(result.getValuableClickCountDay() == 0 ? 0.0 : result.getDeepConversionsCountDay() / result.getValuableClickCountDay());
+        result.setWebOrderCostDay(result.getOrderCountDay() == 0 ? 0 : result.getCostDay() / result.getOrderCountDay());
+        result.setOrderRateDay(result.getValidClickCountDay() == 0 ? 0.0 : result.getOrderCountDay() / result.getValidClickCountDay());
+        result.setOrderUnitPriceDay(result.getOrderCountDay() == 0 ? 0 : result.getOrderAmountDay() / result.getOrderCountDay());
+        result.setOrderRoiDay(result.getCostDay() == 0 ? 0.0 : result.getOrderAmountDay() / result.getCostDay());
+        result.setWechatMinigameRegisterCostDay(result.getWechatAppRegisterUvDay() == 0 ? 0 : result.getCostDay() / result.getWechatAppRegisterUvDay());
+        result.setWechatMinigameRegisterRateDay(result.getValidClickCountDay() == 0 ? 0.0 : result.getWechatAppRegisterUvDay() / result.getValidClickCountDay());
+        result.setWechatMinigameArpuDay(result.getWechatAppRegisterUvDay() == 0 ? 0.0 : result.getOrderAmountDay() / result.getWechatAppRegisterUvDay());
+        result.setOfficialAccountFollowRateDay(result.getValidClickCountDay() == 0 ? 0.0 : result.getOfficialAccountFollowCountDay() / result.getValidClickCountDay());
+        result.setOfficialAccountRegisterRateDay(result.getOfficialAccountFollowCountDay() == 0 ? 0.0 : result.getOfficialAccountRegisterUserCountDay() / result.getOfficialAccountFollowCountDay());
+        result.setOfficialAccountRegisterCostDay(result.getOfficialAccountRegisterUserCountDay() == 0 ? 0 : result.getCostDay() / result.getOfficialAccountRegisterUserCountDay());
+        result.setOfficialAccountRegisterRoiDay(result.getCostDay() == 0 ? 0 : result.getOfficialAccountRegisterAmountDay() / result.getCostDay());
+        result.setOfficialAccountApplyRateDay(result.getOfficialAccountFollowCountDay() == 0 ? 0.0 : result.getOfficialAccountApplyUserCountDay() / result.getOfficialAccountFollowCountDay());
+        result.setOfficialAccountApplyCostDay(result.getOfficialAccountApplyUserCountDay() == 0 ? 0 : result.getCostDay() / result.getOfficialAccountApplyUserCountDay());
+        result.setOfficialAccountApplyRoiDay(result.getCostDay() == 0 ? 0 : result.getOfficialAccountApplyAmountDay() / result.getCostDay());
+        result.setOfficialAccountOrderRateDay(result.getOfficialAccountFollowCountDay() == 0 ? 0.0 : result.getOfficialAccountOrderUserCountDay() / result.getOfficialAccountFollowCountDay());
+        result.setOfficialAccountOrderCostDay(result.getOfficialAccountOrderCountDay() == 0 ? 0 : result.getCostDay() / result.getOfficialAccountOrderCountDay());
+        result.setOfficialAccountOrderRoiDay(result.getCostDay() == 0 ? 0 : result.getOfficialAccountOrderAmountDay() / result.getCostDay());
+
+        // 聚合总数据
         if (yesterdayDWD == null) {
             result.setCostDeviationRateTotal(result.getCostDeviationRateDay());
             result.setCostTotal(result.getCostDay());

+ 210 - 106
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/pojo/entity/AdStatOfMinuteDWD.java

@@ -1592,16 +1592,158 @@ public class AdStatOfMinuteDWD {
     @SerializedName("no_interest_count_minute")
     private Long noInterestCountMinute;
 
+    public static void resetHourAndMinute(AdStatOfMinuteDWD result) {
+        result.setCostDeviationRateHour(0.0);
+        result.setCostDeviationRateMinute(0.0);
+        result.setCostHour(0L);
+        result.setCostMinute(0L);
+        result.setCompensationAmountHour(0L);
+        result.setCompensationAmountMinute(0L);
+        result.setViewCountHour(0L);
+        result.setViewCountMinute(0L);
+        result.setThousandDisplayPriceAll(0L);
+        result.setThousandDisplayPriceHour(0L);
+        result.setThousandDisplayPriceMinute(0L);
+        result.setAvgViewPerUserHour(0.0);
+        result.setAvgViewPerUserMinute(0.0);
+        result.setValidClickCountHour(0L);
+        result.setValidClickCountMinute(0L);
+        result.setCtrHour(0.0);
+        result.setCtrMinute(0.0);
+        result.setCpcAll(0L);
+        result.setCpcHour(0L);
+        result.setCpcMinute(0L);
+        result.setValuableClickCountHour(0L);
+        result.setValuableClickCountMinute(0L);
+        result.setValuableClickRateHour(0.0);
+        result.setValuableClickRateMinute(0.0);
+        result.setValuableClickCostAll(0L);
+        result.setValuableClickCostHour(0L);
+        result.setValuableClickCostMinute(0L);
+        result.setConversionsCountHour(0L);
+        result.setConversionsCountMinute(0L);
+        result.setConversionsCostAll(0L);
+        result.setConversionsCostHour(0L);
+        result.setConversionsCostMinute(0L);
+        result.setConversionsRateHour(0.0);
+        result.setConversionsRateMinute(0.0);
+        result.setDeepConversionsCountHour(0L);
+        result.setDeepConversionsCountMinute(0L);
+        result.setDeepConversionsCostAll(0L);
+        result.setDeepConversionsCostHour(0L);
+        result.setDeepConversionsCostMinute(0L);
+        result.setDeepConversionsRateHour(0.0);
+        result.setDeepConversionsRateMinute(0.0);
+        result.setOrderCountHour(0L);
+        result.setOrderCountMinute(0L);
+        result.setFirstDayOrderCountHour(0L);
+        result.setFirstDayOrderCountMinute(0L);
+        result.setWebOrderCostAll(0L);
+        result.setWebOrderCostHour(0L);
+        result.setWebOrderCostMinute(0L);
+        result.setOrderRateHour(0.0);
+        result.setOrderRateMinute(0.0);
+        result.setOrderAmountHour(0L);
+        result.setOrderAmountMinute(0L);
+        result.setFirstDayOrderAmountHour(0L);
+        result.setFirstDayOrderAmountMinute(0L);
+        result.setOrderUnitPriceAll(0L);
+        result.setOrderUnitPriceHour(0L);
+        result.setOrderUnitPriceMinute(0L);
+        result.setOrderRoiHour(0.0);
+        result.setOrderRoiMinute(0.0);
+        result.setSignInCountHour(0L);
+        result.setSignInCountMinute(0L);
+        result.setScanFollowCountHour(0L);
+        result.setScanFollowCountMinute(0L);
+        result.setWechatAppRegisterUvHour(0L);
+        result.setWechatAppRegisterUvMinute(0L);
+        result.setWechatMinigameRegisterCostAll(0L);
+        result.setWechatMinigameRegisterCostHour(0L);
+        result.setWechatMinigameRegisterCostMinute(0L);
+        result.setWechatMinigameRegisterRateHour(0.0);
+        result.setWechatMinigameRegisterRateMinute(0.0);
+        result.setWechatMinigameArpuHour(0.0);
+        result.setWechatMinigameArpuMinute(0.0);
+        result.setWechatMinigameRetentionCountHour(0L);
+        result.setWechatMinigameRetentionCountMinute(0L);
+        result.setWechatMinigameCheckoutCountHour(0L);
+        result.setWechatMinigameCheckoutCountMinute(0L);
+        result.setWechatMinigameCheckoutAmountHour(0L);
+        result.setWechatMinigameCheckoutAmountMinute(0L);
+        result.setOfficialAccountFollowCountHour(0L);
+        result.setOfficialAccountFollowCountMinute(0L);
+        result.setOfficialAccountFollowRateHour(0.0);
+        result.setOfficialAccountFollowRateMinute(0.0);
+        result.setOfficialAccountRegisterUserCountHour(0L);
+        result.setOfficialAccountRegisterUserCountMinute(0L);
+        result.setOfficialAccountRegisterRateHour(0.0);
+        result.setOfficialAccountRegisterRateMinute(0.0);
+        result.setOfficialAccountRegisterCostAll(0L);
+        result.setOfficialAccountRegisterCostHour(0L);
+        result.setOfficialAccountRegisterCostMinute(0L);
+        result.setOfficialAccountRegisterAmountHour(0L);
+        result.setOfficialAccountRegisterAmountMinute(0L);
+        result.setOfficialAccountRegisterRoiAll(0L);
+        result.setOfficialAccountRegisterRoiHour(0L);
+        result.setOfficialAccountRegisterRoiMinute(0L);
+        result.setOfficialAccountApplyCountHour(0L);
+        result.setOfficialAccountApplyCountMinute(0L);
+        result.setOfficialAccountApplyUserCountHour(0L);
+        result.setOfficialAccountApplyUserCountMinute(0L);
+        result.setOfficialAccountApplyRateHour(0.0);
+        result.setOfficialAccountApplyRateMinute(0.0);
+        result.setOfficialAccountApplyCostAll(0L);
+        result.setOfficialAccountApplyCostHour(0L);
+        result.setOfficialAccountApplyCostMinute(0L);
+        result.setOfficialAccountApplyAmountHour(0L);
+        result.setOfficialAccountApplyAmountMinute(0L);
+        result.setOfficialAccountApplyRoiAll(0L);
+        result.setOfficialAccountApplyRoiHour(0L);
+        result.setOfficialAccountApplyRoiMinute(0L);
+        result.setOfficialAccountOrderCountHour(0L);
+        result.setOfficialAccountOrderCountMinute(0L);
+        result.setOfficialAccountFirstDayOrderCountHour(0L);
+        result.setOfficialAccountFirstDayOrderCountMinute(0L);
+        result.setOfficialAccountOrderUserCountHour(0L);
+        result.setOfficialAccountOrderUserCountMinute(0L);
+        result.setOfficialAccountOrderRateHour(0.0);
+        result.setOfficialAccountOrderRateMinute(0.0);
+        result.setOfficialAccountOrderCostAll(0L);
+        result.setOfficialAccountOrderCostHour(0L);
+        result.setOfficialAccountOrderCostMinute(0L);
+        result.setOfficialAccountOrderAmountHour(0L);
+        result.setOfficialAccountOrderAmountMinute(0L);
+        result.setOfficialAccountFirstDayOrderAmountHour(0L);
+        result.setOfficialAccountFirstDayOrderAmountMinute(0L);
+        result.setOfficialAccountOrderRoiAll(0L);
+        result.setOfficialAccountOrderRoiHour(0L);
+        result.setOfficialAccountOrderRoiMinute(0L);
+        result.setOfficialAccountConsultCountHour(0L);
+        result.setOfficialAccountConsultCountMinute(0L);
+        result.setOfficialAccountReaderCountHour(0L);
+        result.setOfficialAccountReaderCountMinute(0L);
+        result.setOfficialAccountCreditApplyUserCountHour(0L);
+        result.setOfficialAccountCreditApplyUserCountMinute(0L);
+        result.setOfficialAccountCreditUserCountHour(0L);
+        result.setOfficialAccountCreditUserCountMinute(0L);
+        result.setForwardCountHour(0L);
+        result.setForwardCountMinute(0L);
+        result.setForwardUserCountHour(0L);
+        result.setForwardUserCountMinute(0L);
+        result.setNoInterestCountHour(0L);
+        result.setNoInterestCountMinute(0L);
+    }
+
     /**
      * 聚合
      *
-     * @param beforeYesterdayDayDWD 广告前天(包括前天)聚合的天数据(用来计算总数据)
-     * @param yesterdayMinuteDWD    昨天聚合的数据
-     * @param todayODSList          当前待聚合的数据(当天数据列表)
+     * @param yesterdayDayDWD 广告前天(包括前天)聚合的天数据(用来计算总数据)
+     * @param todayODSList    当前待聚合的数据(当天数据列表)
      * @param statODS
      * @return
      */
-    public static AdStatOfMinuteDWD reduce(AdStatOfDayDWD beforeYesterdayDayDWD, AdStatOfMinuteDWD yesterdayMinuteDWD, List<AdDataOfMinuteODS> todayODSList, AdDataOfMinuteODS statODS, AdStatOfMinuteDWD lastMinuteDWD, long createTime) {
+    public static AdStatOfMinuteDWD reduce(AdStatOfDayDWD yesterdayDayDWD, List<AdDataOfMinuteODS> todayODSList, AdDataOfMinuteODS statODS, AdStatOfMinuteDWD lastMinuteDWD, long createTime) {
         AdStatOfMinuteDWD result = new AdStatOfMinuteDWD();
         BeanUtils.copyProperties(statODS, result);
         result = initValue(result);
@@ -1762,108 +1904,70 @@ public class AdStatOfMinuteDWD {
             result.setNoInterestCountDay(result.getNoInterestCountDay() + todayODS.getNoInterestCount());
         }
         // 填充总数据
-        if (beforeYesterdayDayDWD != null) {
-            result.setCostDeviationRateTotal(beforeYesterdayDayDWD.getCostDeviationRateTotal());
-            result.setCostTotal(beforeYesterdayDayDWD.getCostTotal());
-            result.setCompensationAmountTotal(beforeYesterdayDayDWD.getCompensationAmountTotal());
-            result.setViewCountTotal(beforeYesterdayDayDWD.getViewCountTotal());
-            result.setThousandDisplayPriceAll(beforeYesterdayDayDWD.getThousandDisplayPriceAll());
-            result.setAvgViewPerUserAll(beforeYesterdayDayDWD.getAvgViewPerUserAll());
-            result.setValidClickCountTotal(beforeYesterdayDayDWD.getValidClickCountTotal());
-            result.setCtrAll(beforeYesterdayDayDWD.getCtrAll());
-            result.setCpcAll(beforeYesterdayDayDWD.getCpcAll());
-            result.setValuableClickCountTotal(beforeYesterdayDayDWD.getValuableClickCountTotal());
-            result.setValuableClickRateAll(beforeYesterdayDayDWD.getValuableClickRateAll());
-            result.setValuableClickCostAll(beforeYesterdayDayDWD.getValuableClickCostAll());
-            result.setConversionsCountTotal(beforeYesterdayDayDWD.getConversionsCountTotal());
-            result.setConversionsCostAll(beforeYesterdayDayDWD.getConversionsCostAll());
-            result.setConversionsRateAll(beforeYesterdayDayDWD.getConversionsRateAll());
-            result.setDeepConversionsCountTotal(beforeYesterdayDayDWD.getDeepConversionsCountTotal());
-            result.setDeepConversionsCostAll(beforeYesterdayDayDWD.getDeepConversionsCostAll());
-            result.setDeepConversionsRateAll(beforeYesterdayDayDWD.getDeepConversionsRateAll());
-            result.setOrderCountTotal(beforeYesterdayDayDWD.getOrderCountTotal());
-            result.setFirstDayOrderCountTotal(beforeYesterdayDayDWD.getFirstDayOrderCountTotal());
-            result.setWebOrderCostAll(beforeYesterdayDayDWD.getWebOrderCostAll());
-            result.setOrderRateAll(beforeYesterdayDayDWD.getOrderRateAll());
-            result.setOrderAmountTotal(beforeYesterdayDayDWD.getOrderAmountTotal());
-            result.setFirstDayOrderAmountTotal(beforeYesterdayDayDWD.getFirstDayOrderAmountTotal());
-            result.setOrderUnitPriceAll(beforeYesterdayDayDWD.getOrderUnitPriceAll());
-            result.setOrderRoiAll(beforeYesterdayDayDWD.getOrderRoiAll());
-            result.setSignInCountTotal(beforeYesterdayDayDWD.getSignInCountTotal());
-            result.setScanFollowCountTotal(beforeYesterdayDayDWD.getScanFollowCountTotal());
-            result.setWechatAppRegisterUvTotal(beforeYesterdayDayDWD.getWechatAppRegisterUvTotal());
-            result.setWechatMinigameRegisterCostAll(beforeYesterdayDayDWD.getWechatMinigameRegisterCostAll());
-            result.setWechatMinigameRegisterRateAll(beforeYesterdayDayDWD.getWechatMinigameRegisterRateAll());
-            result.setWechatMinigameArpuAll(beforeYesterdayDayDWD.getWechatMinigameArpuAll());
-            result.setWechatMinigameRetentionCountTotal(beforeYesterdayDayDWD.getWechatMinigameRetentionCountTotal());
-            result.setWechatMinigameCheckoutCountTotal(beforeYesterdayDayDWD.getWechatMinigameCheckoutCountTotal());
-            result.setWechatMinigameCheckoutAmountTotal(beforeYesterdayDayDWD.getWechatMinigameCheckoutAmountTotal());
-            result.setOfficialAccountFollowCountTotal(beforeYesterdayDayDWD.getOfficialAccountFollowCountTotal());
-            result.setOfficialAccountFollowRateAll(beforeYesterdayDayDWD.getOfficialAccountFollowRateAll());
-            result.setOfficialAccountRegisterUserCountTotal(beforeYesterdayDayDWD.getOfficialAccountRegisterUserCountTotal());
-            result.setOfficialAccountRegisterRateAll(beforeYesterdayDayDWD.getOfficialAccountRegisterRateAll());
-            result.setOfficialAccountRegisterCostAll(beforeYesterdayDayDWD.getOfficialAccountRegisterCostAll());
-            result.setOfficialAccountRegisterAmountTotal(beforeYesterdayDayDWD.getOfficialAccountRegisterAmountTotal());
-            result.setOfficialAccountRegisterRoiAll(beforeYesterdayDayDWD.getOfficialAccountRegisterRoiAll());
-            result.setOfficialAccountApplyCountTotal(beforeYesterdayDayDWD.getOfficialAccountApplyCountTotal());
-            result.setOfficialAccountApplyUserCountTotal(beforeYesterdayDayDWD.getOfficialAccountApplyUserCountTotal());
-            result.setOfficialAccountApplyRateAll(beforeYesterdayDayDWD.getOfficialAccountApplyRateAll());
-            result.setOfficialAccountApplyCostAll(beforeYesterdayDayDWD.getOfficialAccountApplyCostAll());
-            result.setOfficialAccountApplyAmountTotal(beforeYesterdayDayDWD.getOfficialAccountApplyAmountTotal());
-            result.setOfficialAccountApplyRoiAll(beforeYesterdayDayDWD.getOfficialAccountApplyRoiAll());
-            result.setOfficialAccountOrderCountTotal(beforeYesterdayDayDWD.getOfficialAccountOrderCountTotal());
-            result.setOfficialAccountFirstDayOrderCountTotal(beforeYesterdayDayDWD.getOfficialAccountFirstDayOrderCountTotal());
-            result.setOfficialAccountOrderUserCountTotal(beforeYesterdayDayDWD.getOfficialAccountOrderUserCountTotal());
-            result.setOfficialAccountOrderRateAll(beforeYesterdayDayDWD.getOfficialAccountOrderRateAll());
-            result.setOfficialAccountOrderCostAll(beforeYesterdayDayDWD.getOfficialAccountOrderCostAll());
-            result.setOfficialAccountOrderAmountTotal(beforeYesterdayDayDWD.getOfficialAccountOrderAmountTotal());
-            result.setOfficialAccountFirstDayOrderAmountTotal(beforeYesterdayDayDWD.getOfficialAccountFirstDayOrderAmountTotal());
-            result.setOfficialAccountOrderRoiAll(beforeYesterdayDayDWD.getOfficialAccountOrderRoiAll());
-            result.setOfficialAccountConsultCountTotal(beforeYesterdayDayDWD.getOfficialAccountConsultCountTotal());
-            result.setOfficialAccountReaderCountTotal(beforeYesterdayDayDWD.getOfficialAccountReaderCountTotal());
-            result.setOfficialAccountCreditApplyUserCountTotal(beforeYesterdayDayDWD.getOfficialAccountCreditApplyUserCountTotal());
-            result.setOfficialAccountCreditUserCountTotal(beforeYesterdayDayDWD.getOfficialAccountCreditUserCountTotal());
-            result.setForwardCountTotal(beforeYesterdayDayDWD.getForwardCountTotal());
-            result.setForwardUserCountTotal(beforeYesterdayDayDWD.getForwardUserCountTotal());
-            result.setNoInterestCountTotal(beforeYesterdayDayDWD.getNoInterestCountTotal());
-        }
-        if (yesterdayMinuteDWD != null) {
-            result.setCostDeviationRateTotal(result.getCostDeviationRateTotal() + yesterdayMinuteDWD.getCostDeviationRateDay());
-            result.setCostTotal(result.getCostTotal() + yesterdayMinuteDWD.getCostDay());
-            result.setCompensationAmountTotal(result.getCompensationAmountTotal() + yesterdayMinuteDWD.getCompensationAmountDay());
-            result.setViewCountTotal(result.getViewCountTotal() + yesterdayMinuteDWD.getViewCountDay());
-            result.setValidClickCountTotal(result.getValidClickCountTotal() + yesterdayMinuteDWD.getValidClickCountDay());
-            result.setValuableClickCountTotal(result.getValuableClickCountTotal() + yesterdayMinuteDWD.getValuableClickCountDay());
-            result.setConversionsCountTotal(result.getConversionsCountTotal() + yesterdayMinuteDWD.getConversionsCountDay());
-            result.setDeepConversionsCountTotal(result.getDeepConversionsCountTotal() + yesterdayMinuteDWD.getDeepConversionsCountDay());
-            result.setOrderCountTotal(result.getOrderCountTotal() + yesterdayMinuteDWD.getOrderCountDay());
-            result.setFirstDayOrderCountTotal(result.getFirstDayOrderCountTotal() + yesterdayMinuteDWD.getFirstDayOrderCountDay());
-            result.setOrderAmountTotal(result.getOrderAmountTotal() + yesterdayMinuteDWD.getOrderAmountDay());
-            result.setFirstDayOrderAmountTotal(result.getFirstDayOrderAmountTotal() + yesterdayMinuteDWD.getFirstDayOrderAmountDay());
-            result.setSignInCountTotal(result.getSignInCountTotal() + yesterdayMinuteDWD.getSignInCountDay());
-            result.setScanFollowCountTotal(result.getScanFollowCountTotal() + yesterdayMinuteDWD.getScanFollowCountDay());
-            result.setWechatAppRegisterUvTotal(result.getWechatAppRegisterUvTotal() + yesterdayMinuteDWD.getWechatAppRegisterUvDay());
-            result.setWechatMinigameRetentionCountTotal(result.getWechatMinigameRetentionCountTotal() + yesterdayMinuteDWD.getWechatMinigameRetentionCountDay());
-            result.setWechatMinigameCheckoutCountTotal(result.getWechatMinigameCheckoutCountTotal() + yesterdayMinuteDWD.getWechatMinigameCheckoutCountDay());
-            result.setWechatMinigameCheckoutAmountTotal(result.getWechatMinigameCheckoutAmountTotal() + yesterdayMinuteDWD.getWechatMinigameCheckoutAmountDay());
-            result.setOfficialAccountFollowCountTotal(result.getOfficialAccountFollowCountTotal() + yesterdayMinuteDWD.getOfficialAccountFollowCountDay());
-            result.setOfficialAccountRegisterUserCountTotal(result.getOfficialAccountRegisterUserCountTotal() + yesterdayMinuteDWD.getOfficialAccountRegisterUserCountDay());
-            result.setOfficialAccountRegisterAmountTotal(result.getOfficialAccountRegisterAmountTotal() + yesterdayMinuteDWD.getOfficialAccountRegisterAmountDay());
-            result.setOfficialAccountApplyCountTotal(result.getOfficialAccountApplyCountTotal() + yesterdayMinuteDWD.getOfficialAccountApplyCountDay());
-            result.setOfficialAccountApplyUserCountTotal(result.getOfficialAccountApplyUserCountTotal() + yesterdayMinuteDWD.getOfficialAccountApplyUserCountDay());
-            result.setOfficialAccountApplyAmountTotal(result.getOfficialAccountApplyAmountTotal() + yesterdayMinuteDWD.getOfficialAccountApplyAmountDay());
-            result.setOfficialAccountOrderCountTotal(result.getOfficialAccountOrderCountTotal() + yesterdayMinuteDWD.getOfficialAccountOrderCountDay());
-            result.setOfficialAccountFirstDayOrderCountTotal(result.getOfficialAccountFirstDayOrderCountTotal() + yesterdayMinuteDWD.getOfficialAccountFirstDayOrderCountDay());
-            result.setOfficialAccountOrderUserCountTotal(result.getOfficialAccountOrderUserCountTotal() + yesterdayMinuteDWD.getOfficialAccountOrderUserCountDay());
-            result.setOfficialAccountOrderAmountTotal(result.getOfficialAccountOrderAmountTotal() + yesterdayMinuteDWD.getOfficialAccountOrderAmountDay());
-            result.setOfficialAccountFirstDayOrderAmountTotal(result.getOfficialAccountFirstDayOrderAmountTotal() + yesterdayMinuteDWD.getOfficialAccountFirstDayOrderAmountDay());
-            result.setOfficialAccountConsultCountTotal(result.getOfficialAccountConsultCountTotal() + yesterdayMinuteDWD.getOfficialAccountConsultCountDay());
-            result.setOfficialAccountReaderCountTotal(result.getOfficialAccountReaderCountTotal() + yesterdayMinuteDWD.getOfficialAccountReaderCountDay());
-            result.setOfficialAccountCreditApplyUserCountTotal(result.getOfficialAccountCreditApplyUserCountTotal() + yesterdayMinuteDWD.getOfficialAccountCreditApplyUserCountDay());
-            result.setOfficialAccountCreditUserCountTotal(result.getOfficialAccountCreditUserCountTotal() + yesterdayMinuteDWD.getOfficialAccountCreditUserCountDay());
-            result.setForwardCountTotal(result.getForwardCountTotal() + yesterdayMinuteDWD.getForwardCountDay());
-            result.setForwardUserCountTotal(result.getForwardUserCountTotal() + yesterdayMinuteDWD.getForwardUserCountDay());
-            result.setNoInterestCountTotal(result.getNoInterestCountTotal() + yesterdayMinuteDWD.getNoInterestCountDay());
+        if (yesterdayDayDWD != null) {
+            result.setCostDeviationRateTotal(yesterdayDayDWD.getCostDeviationRateTotal());
+            result.setCostTotal(yesterdayDayDWD.getCostTotal());
+            result.setCompensationAmountTotal(yesterdayDayDWD.getCompensationAmountTotal());
+            result.setViewCountTotal(yesterdayDayDWD.getViewCountTotal());
+            result.setThousandDisplayPriceAll(yesterdayDayDWD.getThousandDisplayPriceAll());
+            result.setAvgViewPerUserAll(yesterdayDayDWD.getAvgViewPerUserAll());
+            result.setValidClickCountTotal(yesterdayDayDWD.getValidClickCountTotal());
+            result.setCtrAll(yesterdayDayDWD.getCtrAll());
+            result.setCpcAll(yesterdayDayDWD.getCpcAll());
+            result.setValuableClickCountTotal(yesterdayDayDWD.getValuableClickCountTotal());
+            result.setValuableClickRateAll(yesterdayDayDWD.getValuableClickRateAll());
+            result.setValuableClickCostAll(yesterdayDayDWD.getValuableClickCostAll());
+            result.setConversionsCountTotal(yesterdayDayDWD.getConversionsCountTotal());
+            result.setConversionsCostAll(yesterdayDayDWD.getConversionsCostAll());
+            result.setConversionsRateAll(yesterdayDayDWD.getConversionsRateAll());
+            result.setDeepConversionsCountTotal(yesterdayDayDWD.getDeepConversionsCountTotal());
+            result.setDeepConversionsCostAll(yesterdayDayDWD.getDeepConversionsCostAll());
+            result.setDeepConversionsRateAll(yesterdayDayDWD.getDeepConversionsRateAll());
+            result.setOrderCountTotal(yesterdayDayDWD.getOrderCountTotal());
+            result.setFirstDayOrderCountTotal(yesterdayDayDWD.getFirstDayOrderCountTotal());
+            result.setWebOrderCostAll(yesterdayDayDWD.getWebOrderCostAll());
+            result.setOrderRateAll(yesterdayDayDWD.getOrderRateAll());
+            result.setOrderAmountTotal(yesterdayDayDWD.getOrderAmountTotal());
+            result.setFirstDayOrderAmountTotal(yesterdayDayDWD.getFirstDayOrderAmountTotal());
+            result.setOrderUnitPriceAll(yesterdayDayDWD.getOrderUnitPriceAll());
+            result.setOrderRoiAll(yesterdayDayDWD.getOrderRoiAll());
+            result.setSignInCountTotal(yesterdayDayDWD.getSignInCountTotal());
+            result.setScanFollowCountTotal(yesterdayDayDWD.getScanFollowCountTotal());
+            result.setWechatAppRegisterUvTotal(yesterdayDayDWD.getWechatAppRegisterUvTotal());
+            result.setWechatMinigameRegisterCostAll(yesterdayDayDWD.getWechatMinigameRegisterCostAll());
+            result.setWechatMinigameRegisterRateAll(yesterdayDayDWD.getWechatMinigameRegisterRateAll());
+            result.setWechatMinigameArpuAll(yesterdayDayDWD.getWechatMinigameArpuAll());
+            result.setWechatMinigameRetentionCountTotal(yesterdayDayDWD.getWechatMinigameRetentionCountTotal());
+            result.setWechatMinigameCheckoutCountTotal(yesterdayDayDWD.getWechatMinigameCheckoutCountTotal());
+            result.setWechatMinigameCheckoutAmountTotal(yesterdayDayDWD.getWechatMinigameCheckoutAmountTotal());
+            result.setOfficialAccountFollowCountTotal(yesterdayDayDWD.getOfficialAccountFollowCountTotal());
+            result.setOfficialAccountFollowRateAll(yesterdayDayDWD.getOfficialAccountFollowRateAll());
+            result.setOfficialAccountRegisterUserCountTotal(yesterdayDayDWD.getOfficialAccountRegisterUserCountTotal());
+            result.setOfficialAccountRegisterRateAll(yesterdayDayDWD.getOfficialAccountRegisterRateAll());
+            result.setOfficialAccountRegisterCostAll(yesterdayDayDWD.getOfficialAccountRegisterCostAll());
+            result.setOfficialAccountRegisterAmountTotal(yesterdayDayDWD.getOfficialAccountRegisterAmountTotal());
+            result.setOfficialAccountRegisterRoiAll(yesterdayDayDWD.getOfficialAccountRegisterRoiAll());
+            result.setOfficialAccountApplyCountTotal(yesterdayDayDWD.getOfficialAccountApplyCountTotal());
+            result.setOfficialAccountApplyUserCountTotal(yesterdayDayDWD.getOfficialAccountApplyUserCountTotal());
+            result.setOfficialAccountApplyRateAll(yesterdayDayDWD.getOfficialAccountApplyRateAll());
+            result.setOfficialAccountApplyCostAll(yesterdayDayDWD.getOfficialAccountApplyCostAll());
+            result.setOfficialAccountApplyAmountTotal(yesterdayDayDWD.getOfficialAccountApplyAmountTotal());
+            result.setOfficialAccountApplyRoiAll(yesterdayDayDWD.getOfficialAccountApplyRoiAll());
+            result.setOfficialAccountOrderCountTotal(yesterdayDayDWD.getOfficialAccountOrderCountTotal());
+            result.setOfficialAccountFirstDayOrderCountTotal(yesterdayDayDWD.getOfficialAccountFirstDayOrderCountTotal());
+            result.setOfficialAccountOrderUserCountTotal(yesterdayDayDWD.getOfficialAccountOrderUserCountTotal());
+            result.setOfficialAccountOrderRateAll(yesterdayDayDWD.getOfficialAccountOrderRateAll());
+            result.setOfficialAccountOrderCostAll(yesterdayDayDWD.getOfficialAccountOrderCostAll());
+            result.setOfficialAccountOrderAmountTotal(yesterdayDayDWD.getOfficialAccountOrderAmountTotal());
+            result.setOfficialAccountFirstDayOrderAmountTotal(yesterdayDayDWD.getOfficialAccountFirstDayOrderAmountTotal());
+            result.setOfficialAccountOrderRoiAll(yesterdayDayDWD.getOfficialAccountOrderRoiAll());
+            result.setOfficialAccountConsultCountTotal(yesterdayDayDWD.getOfficialAccountConsultCountTotal());
+            result.setOfficialAccountReaderCountTotal(yesterdayDayDWD.getOfficialAccountReaderCountTotal());
+            result.setOfficialAccountCreditApplyUserCountTotal(yesterdayDayDWD.getOfficialAccountCreditApplyUserCountTotal());
+            result.setOfficialAccountCreditUserCountTotal(yesterdayDayDWD.getOfficialAccountCreditUserCountTotal());
+            result.setForwardCountTotal(yesterdayDayDWD.getForwardCountTotal());
+            result.setForwardUserCountTotal(yesterdayDayDWD.getForwardUserCountTotal());
+            result.setNoInterestCountTotal(yesterdayDayDWD.getNoInterestCountTotal());
         }
         result.setCostDeviationRateTotal(result.getCostDeviationRateTotal() + result.getCostDeviationRateDay());
         result.setCostTotal(result.getCostTotal() + result.getCostDay());

+ 3 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdHourDTOStreamProcess.java

@@ -11,6 +11,7 @@ import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
 import org.springframework.beans.BeanUtils;
 
+import java.time.LocalDateTime;
 import java.util.Date;
 
 /**
@@ -31,7 +32,8 @@ public class AdHourDTOStreamProcess extends ProcessFunction<String, AdDataOfMinu
         // 指记录被创建的时间
         long createTime = dto.getCreateTime();
         // 指记录统计的时间(如果是实时统计的数据精确到分钟,回滚的历史数据精确到天)
-        long statTime = DateUtil.localDateTimeToMilli(DateUtil.milliToLocalDateTime(dto.getDataTime()).withSecond(0).withNano(0));
+        LocalDateTime statDateTime = DateUtil.milliToLocalDateTime(dto.getDataTime()).withSecond(0).withNano(0);
+        long statTime = DateUtil.localDateTimeToMilli(statDateTime);
         HourlyReportsGetListStruct struct = dto.getHourlyReportsGetListStruct();
 
         if (createTime - statTime > (60 * 60 * 1000L)) {

+ 33 - 28
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdHourDWDProcess.java

@@ -8,10 +8,7 @@ import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfHourDWD;
 import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
 import flink.zanxiangnet.ad.monitoring.util.DateUtil;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.state.*;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
@@ -24,9 +21,7 @@ import org.apache.ibatis.session.SqlSessionFactoryBuilder;
 import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory;
 
 import java.time.LocalDate;
-import java.util.Map;
-import java.util.Properties;
-import java.util.stream.Collectors;
+import java.util.*;
 
 /**
  * 往前回滚 10天的小时数据(警告,此数据依赖小时数据必定在天数据之后拉取来实现的。不然会拉下昨天的数据没有统计进来)
@@ -35,11 +30,11 @@ import java.util.stream.Collectors;
 public class AdHourDWDProcess extends KeyedProcessFunction<Long, AdDataOfHourODS, AdStatOfHourDWD> {
     private SqlSessionFactory sqlSessionFactory;
     // 上次查询的天数据
-    private ValueState<String> lastQueryDayState;
+    private ValueState<Long> lastQueryTimeState;
     // 聚合的天的数据
-    private MapState<String, AdStatOfDayDWD> historyReduceState;
+    private ListState<AdStatOfDayDWD> historyReduceState;
     // 上小时聚合的结果
-    private MapState<String, AdStatOfHourDWD> lastReduceState;
+    private MapState<String, Map<Integer, AdDataOfHourODS>> historyState;
 
     @Override
     public void open(Configuration conf) {
@@ -64,9 +59,9 @@ public class AdHourDWDProcess extends KeyedProcessFunction<Long, AdDataOfHourODS
         configuration.addMapper(AdStatOfDayDWDMapper.class);
         sqlSessionFactory = new SqlSessionFactoryBuilder().build(configuration);
 
-        lastQueryDayState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastQueryDayState", String.class));
-        historyReduceState = getRuntimeContext().getMapState(new MapStateDescriptor<>("historyReduceState", Types.STRING, Types.POJO(AdStatOfDayDWD.class)));
-        lastReduceState = getRuntimeContext().getMapState(new MapStateDescriptor<>("lastReduceState", Types.STRING, Types.POJO(AdStatOfHourDWD.class)));
+        lastQueryTimeState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastQueryTimeState", Types.LONG));
+        historyReduceState = getRuntimeContext().getListState(new ListStateDescriptor<>("historyReduceState", Types.POJO(AdStatOfDayDWD.class)));
+        historyState = getRuntimeContext().getMapState(new MapStateDescriptor<>("historyState", Types.STRING, Types.MAP(Types.INT, Types.POJO(AdDataOfHourODS.class))));
     }
 
     @Override
@@ -76,34 +71,44 @@ public class AdHourDWDProcess extends KeyedProcessFunction<Long, AdDataOfHourODS
         LocalDate today = LocalDate.now();
         long adId = element.getAdId();
 
-        String lastQueryDay = lastQueryDayState.value();
-        // 从 maxCompute拉取指定 广告的历史数据
-        if (lastQueryDay == null || !lastQueryDay.equals(DateUtil.formatLocalDate(today))) {
+        // 每小时从 ck更新下历史的天数据
+        Long lastQueryTime = lastQueryTimeState.value();
+        if (lastQueryTime == null || (now - lastQueryTime > 60 * 60 * 1000)) {
             try (SqlSession session = sqlSessionFactory.openSession()) {
                 AdStatOfDayDWDMapper mapper = session.getMapper(AdStatOfDayDWDMapper.class);
-                Map<String, AdStatOfDayDWD> historyData = mapper.lastReduceResult(adId, null, DateUtil.formatLocalDate(LocalDate.now().minusDays(1L)), 60).stream()
-                        .collect(Collectors.toMap(AdStatOfDayDWD::getStatDay, data -> data, (val1, val2) -> val2));
+                List<AdStatOfDayDWD> historyData = mapper.lastReduceResult(adId, null, DateUtil.formatLocalDate(LocalDate.now().minusDays(1L)), 60);
                 historyReduceState.clear();
-                historyReduceState.putAll(historyData);
-                lastQueryDayState.update(DateUtil.formatLocalDate(today));
+                historyReduceState.update(historyData);
+                lastQueryTimeState.update(lastQueryTime);
             }
         }
         AdStatOfDayDWD yesterdayReduceData = null;
-        for (int i = 1; i < 60; i++) {
-            LocalDate day = statDay.minusDays(i);
-            yesterdayReduceData = historyReduceState.get(DateUtil.formatLocalDate(day));
-            if (yesterdayReduceData != null) {
+        for (AdStatOfDayDWD dayDWD : historyReduceState.get()) {
+            if (dayDWD.getStatDay().compareTo(element.getStatDay()) < 0) {
+                yesterdayReduceData = dayDWD;
                 break;
             }
         }
 
-        AdStatOfHourDWD lastReduce = lastReduceState.get(element.getStatDay());
+        Map<Integer, AdDataOfHourODS> dayODSHourMap = historyState.get(element.getStatDay());
+        List<AdDataOfHourODS> dayODSList = new ArrayList<>();
+        dayODSList.add(element);
+        if (dayODSHourMap == null) {
+            dayODSHourMap = new HashMap<>(24);
+        }
+        if (!dayODSHourMap.isEmpty()) {
+            dayODSList = new ArrayList<>(dayODSHourMap.values());
+        }
 
-        AdStatOfHourDWD newStatData = AdStatOfHourDWD.reduce(yesterdayReduceData, lastReduce, element, now);
-        lastReduceState.put(newStatData.getStatDay(), newStatData);
+        AdStatOfHourDWD newStatData = AdStatOfHourDWD.reduce(yesterdayReduceData, dayODSList, element, now);
         collector.collect(newStatData);
 
+        dayODSHourMap.put(element.getHour(), element);
+        historyState.put(element.getStatDay(), dayODSHourMap);
+
         // 往前清理 15天的数据
-        lastReduceState.remove(DateUtil.formatLocalDate(statDay.minusDays(15L)));
+        historyState.remove(DateUtil.formatLocalDate(statDay.minusDays(15L)));
+        historyState.remove(DateUtil.formatLocalDate(statDay.minusDays(16L)));
+        historyState.remove(DateUtil.formatLocalDate(statDay.minusDays(17L)));
     }
 }

+ 70 - 73
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdMinuteDWDProcess.java

@@ -6,10 +6,8 @@ import flink.zanxiangnet.ad.monitoring.pojo.entity.*;
 import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
 import flink.zanxiangnet.ad.monitoring.util.DateUtil;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.state.*;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -20,6 +18,7 @@ import org.apache.ibatis.session.SqlSession;
 import org.apache.ibatis.session.SqlSessionFactory;
 import org.apache.ibatis.session.SqlSessionFactoryBuilder;
 import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory;
+import org.springframework.beans.BeanUtils;
 
 import java.time.LocalDate;
 import java.time.LocalDateTime;
@@ -33,12 +32,10 @@ public class AdMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteODS,
     private static final DateTimeFormatter formatForLastReduceKey = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH:mm");
 
     private SqlSessionFactory sqlSessionFactory;
+    // 上次查询的天数据的时间
+    private ValueState<Long> lastQueryTimeState;
     // 历史的天数据
-    private ValueState<AdStatOfDayDWD> historyDayState;
-    // 上次查询的天数据
-    private ValueState<String> lastQueryDayState;
-    // 之前聚合的昨天的数据
-    private MapState<String, AdStatOfMinuteDWD> yesterdayMinuteState;
+    private ListState<AdStatOfDayDWD> historyDayState;
     // 前 5分钟聚合的数据
     private MapState<String, AdStatOfMinuteDWD> lastReduceState;
 
@@ -68,10 +65,9 @@ public class AdMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteODS,
         configuration.addMapper(AdStatOfDayDWDMapper.class);
         sqlSessionFactory = new SqlSessionFactoryBuilder().build(configuration);
 
-        historyDayState = getRuntimeContext().getState(new ValueStateDescriptor<>("historyDayState", AdStatOfDayDWD.class));
-        lastQueryDayState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastQueryDayState", String.class));
-        yesterdayMinuteState = getRuntimeContext().getMapState(new MapStateDescriptor<>("yesterdayMinuteState", String.class, AdStatOfMinuteDWD.class));
-        lastReduceState = getRuntimeContext().getMapState(new MapStateDescriptor<>("lastReduceState", String.class, AdStatOfMinuteDWD.class));
+        historyDayState = getRuntimeContext().getListState(new ListStateDescriptor<>("historyDayState", Types.POJO(AdStatOfDayDWD.class)));
+        lastQueryTimeState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastQueryTimeState", Types.LONG));
+        lastReduceState = getRuntimeContext().getMapState(new MapStateDescriptor<>("lastReduceState", Types.STRING, Types.POJO(AdStatOfMinuteDWD.class)));
     }
 
     @Override
@@ -83,65 +79,81 @@ public class AdMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteODS,
         String statDay = DateUtil.formatLocalDate(beginDate);
         int hour = beginDateTime.getHour();
         long now = System.currentTimeMillis();
-        AdDataOfMinuteODS statODS = null;
-        List<AdDataOfMinuteODS> adDataOfMinuteODSList = new ArrayList<>(24);
-        for (AdDataOfMinuteODS adDataOfMinuteODS : iterable) {
-            adDataOfMinuteODSList.add(adDataOfMinuteODS);
-            if (adDataOfMinuteODS.getHour() != hour) {
-                continue;
+
+        AdDataOfMinuteODS thisHourMinuteODS = null;
+        List<AdDataOfMinuteODS> thisHourMinuteODSList = new ArrayList<>(24);
+        for (AdDataOfMinuteODS minuteODS : iterable) {
+            thisHourMinuteODSList.add(minuteODS);
+            if (minuteODS.getHour() == hour) {
+                thisHourMinuteODS = minuteODS;
             }
-            statODS = adDataOfMinuteODS;
-        }
-        if (statODS == null) {
-            log.error("分钟流窗口没有拿到整点的小时数据!!!!");
-            return;
         }
+        Long adId = thisHourMinuteODSList.get(0).getAdId();
 
-        Long adId = statODS.getAdId();
-
-        // 昨天聚合的数据
-        AdStatOfMinuteDWD yesterdayMinuteDWD = yesterdayMinuteState.get(DateUtil.formatLocalDate(beginDate.minusDays(1L)));
-        // 之前的数据
-        String lastQueryDay = lastQueryDayState.value();
-        if (lastQueryDay == null || !lastQueryDay.equals(statDay)) {
+        // 每 2小时更新下之前的历史数据
+        Long lastQueryTime = lastQueryTimeState.value();
+        if (lastQueryTime == null || (beginTime - lastQueryTime > 2 * 60 * 60 * 1000L)) {
             try (SqlSession session = sqlSessionFactory.openSession()) {
                 AdStatOfDayDWDMapper mapper = session.getMapper(AdStatOfDayDWDMapper.class);
-                List<AdStatOfDayDWD> historyDayData = mapper.lastReduceResult(adId, null, DateUtil.formatLocalDate(beginDate.minusDays(2L)), 1);
-                if (!historyDayData.isEmpty()) {
-                    historyDayState.update(historyDayData.get(historyDayData.size() - 1));
+                List<AdStatOfDayDWD> historyData = mapper.lastReduceResult(adId, null, DateUtil.formatLocalDate(beginDate.minusDays(1L)), 6);
+                historyDayState.clear();
+                if (!historyData.isEmpty()) {
+                    historyDayState.update(historyData);
                 }
-                lastQueryDayState.update(statDay);
+                lastQueryTimeState.update(beginTime);
+            }
+        }
+
+        AdStatOfDayDWD yesterdayDayDWD = null;
+        for (AdStatOfDayDWD dayDWD : historyDayState.get()) {
+            if (dayDWD.getStatDay().compareTo(statDay) < 0) {
+                yesterdayDayDWD = dayDWD;
+                break;
             }
         }
-        AdStatOfDayDWD beforeYesterdayDayDWD = historyDayState.value();
-
-        AdStatOfMinuteDWD lastReduceData = lastReduceState.get(beginDateTime.minusMinutes(5L).format(formatForLastReduceKey));
-        // 聚合当天的全部数据
-        AdStatOfMinuteDWD newAdStat = AdStatOfMinuteDWD.reduce(beforeYesterdayDayDWD, yesterdayMinuteDWD, adDataOfMinuteODSList, statODS, lastReduceData, now);
-        collector.collect(newAdStat);
-        lastReduceState.put(beginDateTime.format(formatForLastReduceKey), newAdStat);
-        yesterdayMinuteState.put(statDay, newAdStat);
-
-        if (hour > 0 && (beginDateTime.getMinute() == 15 || beginDateTime.getMinute() == 30 || beginDateTime.getMinute() == 55)) {
-            // 重新输出一条上小时的数据
-            List<AdDataOfMinuteODS> lastHourODSList = adDataOfMinuteODSList.stream().filter(ods -> ods.getHour() < hour).collect(Collectors.toList());
-            AdDataOfMinuteODS lastHourODS = null;
-            for (AdDataOfMinuteODS ods : lastHourODSList) {
-                if (ods.getHour() == hour - 1) {
-                    lastHourODS = ods;
-                    break;
+
+        // 重新聚合上小时的数据
+        if (hour > 0 && (beginDateTime.getMinute() == 0 || beginDateTime.getMinute() == 15 || beginDateTime.getMinute() == 55)) {
+            AdDataOfMinuteODS lastHourMinuteODS = null;
+            List<AdDataOfMinuteODS> lastHourMinuteODSList = new ArrayList<>(23);
+            for (AdDataOfMinuteODS minuteODS : thisHourMinuteODSList) {
+                if (minuteODS.getHour() >= hour) {
+                    continue;
+                }
+                lastHourMinuteODSList.add(minuteODS);
+                if (minuteODS.getHour() == hour - 1) {
+                    lastHourMinuteODS = minuteODS;
                 }
             }
-            if (lastHourODS != null) {
-                LocalDateTime lastHourBeginTime = LocalDateTime.of(beginDate, LocalTime.of(lastHourODS.getHour(), 55, 0));
-                lastHourODS.setStatTime(DateUtil.localDateTimeToMilli(lastHourBeginTime));
-                AdStatOfMinuteDWD lastHourReduceData = lastReduceState.get(lastHourBeginTime.minusMinutes(5L).format(formatForLastReduceKey));
+            if (thisHourMinuteODS == null && lastHourMinuteODS != null) {
+                LocalDateTime lastHourStatTime = LocalDateTime.of(beginDate, LocalTime.of(hour - 1, 55, 0));
+                lastHourMinuteODS.setStatTime(DateUtil.localDateTimeToMilli(lastHourStatTime));
+                AdStatOfMinuteDWD lastReduce = lastReduceState.get(lastHourStatTime.minusMinutes(5L).format(formatForLastReduceKey));
                 // 聚合当天的全部数据
-                AdStatOfMinuteDWD lastHourAdStat = AdStatOfMinuteDWD.reduce(beforeYesterdayDayDWD, yesterdayMinuteDWD, lastHourODSList, lastHourODS, lastHourReduceData, now);
+                AdStatOfMinuteDWD lastHourAdStat = AdStatOfMinuteDWD.reduce(yesterdayDayDWD, lastHourMinuteODSList, lastHourMinuteODS, lastReduce, now);
                 collector.collect(lastHourAdStat);
+
+                // 造一条空数据
+                if (beginDateTime.getMinute() == 0) {
+                    LocalDateTime hourStatTime = LocalDateTime.of(beginDate, LocalTime.of(hour, 0, 0));
+                    AdStatOfMinuteDWD hourTemp = new AdStatOfMinuteDWD();
+                    BeanUtils.copyProperties(lastReduce, hourTemp);
+                    hourTemp.setHour(hour);
+                    hourTemp.setStatTime(DateUtil.localDateTimeToMilli(hourStatTime));
+                    AdStatOfMinuteDWD.resetHourAndMinute(hourTemp);
+                    collector.collect(hourTemp);
+                }
             }
         }
 
+        if (thisHourMinuteODS != null) {
+            AdStatOfMinuteDWD lastReduce = lastReduceState.get(beginDateTime.minusMinutes(5L).format(formatForLastReduceKey));
+            // 聚合当天的全部数据
+            AdStatOfMinuteDWD newAdStat = AdStatOfMinuteDWD.reduce(yesterdayDayDWD, thisHourMinuteODSList, thisHourMinuteODS, lastReduce, now);
+            collector.collect(newAdStat);
+            lastReduceState.put(beginDateTime.format(formatForLastReduceKey), newAdStat);
+        }
+
         clearState(beginDateTime);
     }
 
@@ -153,7 +165,7 @@ public class AdMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteODS,
         List<String> removeKeys = new ArrayList<>(20);
         Iterator<Map.Entry<String, AdStatOfMinuteDWD>> lastIterator = lastReduceState.iterator();
 
-        String delLastReduceKey = beginDateTime.minusHours(2).format(formatForLastReduceKey);
+        String delLastReduceKey = beginDateTime.minusHours(3).format(formatForLastReduceKey);
         while (lastIterator.hasNext()) {
             Map.Entry<String, AdStatOfMinuteDWD> temp = lastIterator.next();
             if (temp.getKey().compareTo(delLastReduceKey) < 0) {
@@ -165,20 +177,5 @@ public class AdMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteODS,
                 lastReduceState.remove(key);
             }
         }
-        removeKeys.clear();
-        Iterator<Map.Entry<String, AdStatOfMinuteDWD>> yesterdayIterator = yesterdayMinuteState.iterator();
-
-        String delYesterdayKey = DateUtil.formatLocalDate(beginDateTime.toLocalDate().minusDays(5));
-        while (yesterdayIterator.hasNext()) {
-            Map.Entry<String, AdStatOfMinuteDWD> temp = yesterdayIterator.next();
-            if (temp.getKey().compareTo(delYesterdayKey) <= 0) {
-                removeKeys.add(temp.getKey());
-            }
-        }
-        if (!removeKeys.isEmpty()) {
-            for (String key : removeKeys) {
-                yesterdayMinuteState.remove(key);
-            }
-        }
     }
 }

+ 4 - 4
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostHourProcess.java

@@ -50,7 +50,7 @@ public class CostHourProcess extends KeyedProcessFunction<Long, AdStatOfHourDWD,
             lastThreeHourDay = day.minusDays(1L);
             lastThreeHour = 23;
         }
-        long costDiff = 0L, costLastHour = 0L;
+        long costDiff = adStatOfHourDWD.getCostHour(), costLastHour = 0L;
         Map<Integer, AdStatOfHourDWD> lastHourMapping = historyReduceState.get(DateUtil.formatLocalDate(lastHourDay));
         if (lastHourMapping != null && !lastHourMapping.isEmpty()) {
             AdStatOfHourDWD lastHourDWD = lastHourMapping.get(lastHour);
@@ -59,7 +59,7 @@ public class CostHourProcess extends KeyedProcessFunction<Long, AdStatOfHourDWD,
                 costDiff = adStatOfHourDWD.getCostHour() - lastHourDWD.getCostHour();
             }
         }
-        long costLastHourDiff = 0, costLastTwoHour = 0;
+        long costLastHourDiff = costLastHour, costLastTwoHour = 0L;
         Map<Integer, AdStatOfHourDWD> lastTwoHourMapping = historyReduceState.get(DateUtil.formatLocalDate(lastTwoHourDay));
         if (lastTwoHourMapping != null && !lastTwoHourMapping.isEmpty()) {
             AdStatOfHourDWD lastTwoHourDWD = lastTwoHourMapping.get(lastTwoHour);
@@ -68,16 +68,16 @@ public class CostHourProcess extends KeyedProcessFunction<Long, AdStatOfHourDWD,
                 costLastHourDiff = costLastHour - lastTwoHourDWD.getCostHour();
             }
         }
-        long costLastTwoHourDiff = 0, costLastThreeTrend = 0, costLastThreeHour = 0;
+        long costLastTwoHourDiff = costLastTwoHour, costLastThreeHour = 0;
         Map<Integer, AdStatOfHourDWD> lastThreeHourMapping = historyReduceState.get(DateUtil.formatLocalDate(lastThreeHourDay));
         if (lastThreeHourMapping != null && !lastThreeHourMapping.isEmpty()) {
             AdStatOfHourDWD lastThreeHourDWD = lastThreeHourMapping.get(lastThreeHour);
             if (lastThreeHourDWD != null) {
-                costLastThreeTrend = costLastHourDiff > 0 && costDiff > 0 ? 1 : 0;
                 costLastThreeHour = lastThreeHourDWD.getCostHour();
                 costLastTwoHourDiff = costLastTwoHour - lastThreeHourDWD.getCostHour();
             }
         }
+        long costLastThreeTrend = costLastHourDiff > 0 && costDiff > 0 ? 1 : 0;
 
         if (historyReduceState.get(adStatOfHourDWD.getStatDay()) == null) {
             Map<Integer, AdStatOfHourDWD> hourMapping = new HashMap<>(24);

+ 12 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/trigger/AdMinuteODSStreamTrigger.java

@@ -6,6 +6,8 @@ import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 
+import java.time.LocalDateTime;
+
 public class AdMinuteODSStreamTrigger extends Trigger<AdDataOfMinuteODS, TimeWindow> {
     /**
      * @param adDataOfMinuteODS
@@ -17,9 +19,18 @@ public class AdMinuteODSStreamTrigger extends Trigger<AdDataOfMinuteODS, TimeWin
      */
     @Override
     public TriggerResult onElement(AdDataOfMinuteODS adDataOfMinuteODS, long time, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
-        if (adDataOfMinuteODS.getHour() == DateUtil.milliToLocalDateTime(time).getHour()) {
+        LocalDateTime beginDateTime = DateUtil.milliToLocalDateTime(timeWindow.getStart());
+        if (adDataOfMinuteODS.getHour() == beginDateTime.getHour()) {
             return TriggerResult.FIRE;
         }
+        // 回滚上小时的数据
+        if (beginDateTime.getHour() > 0 && (adDataOfMinuteODS.getHour() == beginDateTime.getHour() - 1)) {
+            int minute = beginDateTime.getMinute();
+            // 分别在 5分钟、20分钟、1小时后重新计算上小时的数据
+            if (minute == 0 || minute == 15 || minute == 55) {
+                return TriggerResult.FIRE;
+            }
+        }
         return TriggerResult.CONTINUE;
     }