Selaa lähdekoodia

完善造实时的假数据的逻辑

wcc 3 vuotta sitten
vanhempi
commit
1cace472b6

+ 53 - 78
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/pojo/entity/AdStatOfMinuteDWD.java

@@ -1818,83 +1818,31 @@ public class AdStatOfMinuteDWD {
             result.setCostDay(result.getCostDay() + todayODS.getCost());
             result.setCompensationAmountDay(result.getCompensationAmountDay() + todayODS.getCompensationAmount());
             result.setViewCountDay(result.getViewCountDay() + todayODS.getViewCount());
-            // 总消耗 / 总曝光
-            result.setThousandDisplayPriceDay(result.getViewCountDay() == 0 ? 0 : (result.getCostDay() / result.getViewCountDay() * 1000));
             result.setValidClickCountDay(result.getValidClickCountDay() + todayODS.getValidClickCount());
-            // 广告点击次数 / 广告曝光次数
-            result.setCtrDay(result.getViewCountDay() == 0 ? 0.0 : result.getValidClickCountDay() / result.getViewCountDay());
-            // 广告花费/广告点击次数
-            result.setCpcDay(result.getValidClickCountDay() == 0 ? 0 : result.getCostDay() / result.getValidClickCountDay());
             result.setValuableClickCountDay(result.getValuableClickCountDay() + todayODS.getValuableClickCount());
-            // 广告可转化点击次数/广告曝光次数
-            result.setValuableClickRateDay(result.getViewCountDay() == 0 ? 0.0 : result.getValuableClickCountDay() / result.getViewCountDay());
-            // 广告花费/可转化点击次数
-            result.setValuableClickCostDay(result.getValuableClickCountDay() == 0 ? 0 : result.getCostDay() / result.getValuableClickCountDay());
             result.setConversionsCountDay(result.getConversionsCountDay() + todayODS.getConversionsCount());
-            // 广告花费/转化目标量
-            result.setConversionsCostDay(result.getConversionsCountDay() == 0 ? 0 : result.getCostDay() / result.getConversionsCountDay());
-            // 公众号:转化目标量/点击次数。
-            result.setConversionsRateDay(result.getValidClickCountDay() == 0 ? 0.0 : result.getConversionsCountDay() / result.getValidClickCountDay());
             result.setDeepConversionsCountDay(result.getDeepConversionsCountDay() + todayODS.getDeepConversionsCount());
-            // 广告花费/深度转化目标量
-            result.setDeepConversionsCostDay(result.getDeepConversionsCountDay() == 0 ? 0 : result.getCostDay() / result.getDeepConversionsCountDay());
-            // 深度转化目标量/可转化点击次数
-            result.setDeepConversionsRateDay(result.getValuableClickCountDay() == 0 ? 0.0 : result.getDeepConversionsCountDay() / result.getValuableClickCountDay());
             result.setOrderCountDay(result.getOrderCountDay() + todayODS.getOrderCount());
             result.setFirstDayOrderCountDay(result.getFirstDayOrderCountDay() + todayODS.getFirstDayOrderCount());
-            // 广告花费/下单量
-            result.setWebOrderCostDay(result.getOrderCountDay() == 0 ? 0 : result.getCostDay() / result.getOrderCountDay());
-            // 下单量/点击次数
-            result.setOrderRateDay(result.getValidClickCountDay() == 0 ? 0.0 : result.getOrderCountDay() / result.getValidClickCountDay());
             result.setOrderAmountDay(result.getOrderAmountDay() + todayODS.getOrderAmount());
             result.setFirstDayOrderAmountDay(result.getFirstDayOrderAmountDay() + todayODS.getFirstDayOrderAmount());
-            // 下单金额/下单量
-            result.setOrderUnitPriceDay(result.getOrderCountDay() == 0 ? 0 : result.getOrderAmountDay() / result.getOrderCountDay());
-            // 下单金额/广告花费
-            result.setOrderRoiDay(result.getCostDay() == 0 ? 0.0 : result.getOrderAmountDay() / result.getCostDay());
             result.setSignInCountDay(result.getSignInCountDay() + todayODS.getSignInCount());
             result.setScanFollowCountDay(result.getScanFollowCountDay() + todayODS.getScanFollowCount());
             result.setWechatAppRegisterUvDay(result.getWechatAppRegisterUvDay() + todayODS.getWechatAppRegisterUv());
-            // 广告消耗 / 小游戏注册人数
-            result.setWechatMinigameRegisterCostDay(result.getWechatAppRegisterUvDay() == 0 ? 0 : result.getCostDay() / result.getWechatAppRegisterUvDay());
-            // 小游戏注册人数 / 广告点击次数
-            result.setWechatMinigameRegisterRateDay(result.getValidClickCountDay() == 0 ? 0.0 : result.getWechatAppRegisterUvDay() / result.getValidClickCountDay());
-            // 总收益 / 总人数
-            result.setWechatMinigameArpuDay(result.getWechatAppRegisterUvDay() == 0 ? 0.0 : result.getOrderAmountDay() / result.getWechatAppRegisterUvDay());
             result.setWechatMinigameRetentionCountDay(result.getWechatMinigameRetentionCountDay() + todayODS.getWechatMinigameRetentionCount());
             result.setWechatMinigameCheckoutCountDay(result.getWechatMinigameCheckoutCountDay() + todayODS.getWechatMinigameCheckoutCount());
             result.setWechatMinigameCheckoutAmountDay(result.getWechatMinigameCheckoutAmountDay() + todayODS.getWechatMinigameCheckoutAmount());
             result.setOfficialAccountFollowCountDay(result.getOfficialAccountFollowCountDay() + todayODS.getOfficialAccountFollowCount());
-            // 关注次数 / 点击次数
-            result.setOfficialAccountFollowRateDay(result.getValidClickCountDay() == 0 ? 0.0 : result.getOfficialAccountFollowCountDay() / result.getValidClickCountDay());
             result.setOfficialAccountRegisterUserCountDay(result.getOfficialAccountRegisterUserCountDay() + todayODS.getOfficialAccountRegisterUserCount());
-            // 公众号内注册人数 / 公众号关注次数
-            result.setOfficialAccountRegisterRateDay(result.getOfficialAccountFollowCountDay() == 0 ? 0.0 : result.getOfficialAccountRegisterUserCountDay() / result.getOfficialAccountFollowCountDay());
-            // 广告消耗 / 广告注册人数
-            result.setOfficialAccountRegisterCostDay(result.getOfficialAccountRegisterUserCountDay() == 0 ? 0 : result.getCostDay() / result.getOfficialAccountRegisterUserCountDay());
             result.setOfficialAccountRegisterAmountDay(result.getOfficialAccountRegisterAmountDay() + todayODS.getOfficialAccountRegisterAmount());
-            // 注册产生的订单金额累计/广告花费
-            result.setOfficialAccountRegisterRoiDay(result.getCostDay() == 0 ? 0 : result.getOfficialAccountRegisterAmountDay() / result.getCostDay());
             result.setOfficialAccountApplyCountDay(result.getOfficialAccountApplyCountDay() + todayODS.getOfficialAccountApplyCount());
             result.setOfficialAccountApplyUserCountDay(result.getOfficialAccountApplyUserCountDay() + todayODS.getOfficialAccountApplyUserCount());
-            // 公众号内填单的独立用户数/公众号关注次数
-            result.setOfficialAccountApplyRateDay(result.getOfficialAccountFollowCountDay() == 0 ? 0.0 : result.getOfficialAccountApplyUserCountDay() / result.getOfficialAccountFollowCountDay());
-            // 广告花费/广告产生的填单行为数量
-            result.setOfficialAccountApplyCostDay(result.getOfficialAccountApplyUserCountDay() == 0 ? 0 : result.getCostDay() / result.getOfficialAccountApplyUserCountDay());
             result.setOfficialAccountApplyAmountDay(result.getOfficialAccountApplyAmountDay() + todayODS.getOfficialAccountApplyAmount());
-            // 填单产生的订单金额累计/广告花费
-            result.setOfficialAccountApplyRoiDay(result.getCostDay() == 0 ? 0 : result.getOfficialAccountApplyAmountDay() / result.getCostDay());
             result.setOfficialAccountOrderCountDay(result.getOfficialAccountOrderCountDay() + todayODS.getOfficialAccountOrderCount());
             result.setOfficialAccountFirstDayOrderCountDay(result.getOfficialAccountFirstDayOrderCountDay() + todayODS.getOfficialAccountFirstDayOrderCount());
             result.setOfficialAccountOrderUserCountDay(result.getOfficialAccountOrderUserCountDay() + todayODS.getOfficialAccountOrderUserCount());
-            // 公众号内下单独立用户数(UV)/公众号关注次数
-            result.setOfficialAccountOrderRateDay(result.getOfficialAccountFollowCountDay() == 0 ? 0.0 : result.getOfficialAccountOrderUserCountDay() / result.getOfficialAccountFollowCountDay());
-            // 广告花费/广告产生的下单行为数量
-            result.setOfficialAccountOrderCostDay(result.getOfficialAccountOrderCountDay() == 0 ? 0 : result.getCostDay() / result.getOfficialAccountOrderCountDay());
             result.setOfficialAccountOrderAmountDay(result.getOfficialAccountOrderAmountDay() + todayODS.getOfficialAccountOrderAmount());
             result.setOfficialAccountFirstDayOrderAmountDay(result.getOfficialAccountFirstDayOrderAmountDay() + todayODS.getOfficialAccountFirstDayOrderAmount());
-            // 下单产生的订单金额累计/广告花费
-            result.setOfficialAccountOrderRoiDay(result.getCostDay() == 0 ? 0 : result.getOfficialAccountOrderAmountDay() / result.getCostDay());
             result.setOfficialAccountConsultCountDay(result.getOfficialAccountConsultCountDay() + todayODS.getOfficialAccountConsultCount());
             result.setOfficialAccountReaderCountDay(result.getOfficialAccountReaderCountDay() + todayODS.getOfficialAccountReaderCount());
             result.setOfficialAccountCreditApplyUserCountDay(result.getOfficialAccountCreditApplyUserCountDay() + todayODS.getOfficialAccountCreditApplyUserCount());
@@ -1903,64 +1851,91 @@ public class AdStatOfMinuteDWD {
             result.setForwardUserCountDay(result.getForwardUserCountDay() + todayODS.getForwardUserCount());
             result.setNoInterestCountDay(result.getNoInterestCountDay() + todayODS.getNoInterestCount());
         }
+        // 总消耗 / 总曝光
+        result.setThousandDisplayPriceDay(result.getViewCountDay() == 0 ? 0 : (result.getCostDay() / result.getViewCountDay() * 1000));
+        // 广告点击次数 / 广告曝光次数
+        result.setCtrDay(result.getViewCountDay() == 0 ? 0.0 : result.getValidClickCountDay() / result.getViewCountDay());
+        // 广告花费/广告点击次数
+        result.setCpcDay(result.getValidClickCountDay() == 0 ? 0 : result.getCostDay() / result.getValidClickCountDay());
+        // 广告可转化点击次数/广告曝光次数
+        result.setValuableClickRateDay(result.getViewCountDay() == 0 ? 0.0 : result.getValuableClickCountDay() / result.getViewCountDay());
+        // 广告花费/可转化点击次数
+        result.setValuableClickCostDay(result.getValuableClickCountDay() == 0 ? 0 : result.getCostDay() / result.getValuableClickCountDay());
+        // 广告花费/转化目标量
+        result.setConversionsCostDay(result.getConversionsCountDay() == 0 ? 0 : result.getCostDay() / result.getConversionsCountDay());
+        // 公众号:转化目标量/点击次数。
+        result.setConversionsRateDay(result.getValidClickCountDay() == 0 ? 0.0 : result.getConversionsCountDay() / result.getValidClickCountDay());
+        // 广告花费/深度转化目标量
+        result.setDeepConversionsCostDay(result.getDeepConversionsCountDay() == 0 ? 0 : result.getCostDay() / result.getDeepConversionsCountDay());
+        // 深度转化目标量/可转化点击次数
+        result.setDeepConversionsRateDay(result.getValuableClickCountDay() == 0 ? 0.0 : result.getDeepConversionsCountDay() / result.getValuableClickCountDay());
+        // 广告花费/下单量
+        result.setWebOrderCostDay(result.getOrderCountDay() == 0 ? 0 : result.getCostDay() / result.getOrderCountDay());
+        // 下单量/点击次数
+        result.setOrderRateDay(result.getValidClickCountDay() == 0 ? 0.0 : result.getOrderCountDay() / result.getValidClickCountDay());
+        // 下单金额/下单量
+        result.setOrderUnitPriceDay(result.getOrderCountDay() == 0 ? 0 : result.getOrderAmountDay() / result.getOrderCountDay());
+        // 下单金额/广告花费
+        result.setOrderRoiDay(result.getCostDay() == 0 ? 0.0 : result.getOrderAmountDay() / result.getCostDay());
+        // 广告消耗 / 小游戏注册人数
+        result.setWechatMinigameRegisterCostDay(result.getWechatAppRegisterUvDay() == 0 ? 0 : result.getCostDay() / result.getWechatAppRegisterUvDay());
+        // 小游戏注册人数 / 广告点击次数
+        result.setWechatMinigameRegisterRateDay(result.getValidClickCountDay() == 0 ? 0.0 : result.getWechatAppRegisterUvDay() / result.getValidClickCountDay());
+        // 总收益 / 总人数
+        result.setWechatMinigameArpuDay(result.getWechatAppRegisterUvDay() == 0 ? 0.0 : result.getOrderAmountDay() / result.getWechatAppRegisterUvDay());
+        // 关注次数 / 点击次数
+        result.setOfficialAccountFollowRateDay(result.getValidClickCountDay() == 0 ? 0.0 : result.getOfficialAccountFollowCountDay() / result.getValidClickCountDay());
+        // 公众号内注册人数 / 公众号关注次数
+        result.setOfficialAccountRegisterRateDay(result.getOfficialAccountFollowCountDay() == 0 ? 0.0 : result.getOfficialAccountRegisterUserCountDay() / result.getOfficialAccountFollowCountDay());
+        // 广告消耗 / 广告注册人数
+        result.setOfficialAccountRegisterCostDay(result.getOfficialAccountRegisterUserCountDay() == 0 ? 0 : result.getCostDay() / result.getOfficialAccountRegisterUserCountDay());
+        // 注册产生的订单金额累计/广告花费
+        result.setOfficialAccountRegisterRoiDay(result.getCostDay() == 0 ? 0 : result.getOfficialAccountRegisterAmountDay() / result.getCostDay());
+        // 公众号内填单的独立用户数/公众号关注次数
+        result.setOfficialAccountApplyRateDay(result.getOfficialAccountFollowCountDay() == 0 ? 0.0 : result.getOfficialAccountApplyUserCountDay() / result.getOfficialAccountFollowCountDay());
+        // 广告花费/广告产生的填单行为数量
+        result.setOfficialAccountApplyCostDay(result.getOfficialAccountApplyUserCountDay() == 0 ? 0 : result.getCostDay() / result.getOfficialAccountApplyUserCountDay());
+        // 填单产生的订单金额累计/广告花费
+        result.setOfficialAccountApplyRoiDay(result.getCostDay() == 0 ? 0 : result.getOfficialAccountApplyAmountDay() / result.getCostDay());
+        // 公众号内下单独立用户数(UV)/公众号关注次数
+        result.setOfficialAccountOrderRateDay(result.getOfficialAccountFollowCountDay() == 0 ? 0.0 : result.getOfficialAccountOrderUserCountDay() / result.getOfficialAccountFollowCountDay());
+        // 广告花费/广告产生的下单行为数量
+        result.setOfficialAccountOrderCostDay(result.getOfficialAccountOrderCountDay() == 0 ? 0 : result.getCostDay() / result.getOfficialAccountOrderCountDay());
+        // 下单产生的订单金额累计/广告花费
+        result.setOfficialAccountOrderRoiDay(result.getCostDay() == 0 ? 0 : result.getOfficialAccountOrderAmountDay() / result.getCostDay());
+
         // 填充总数据
         if (yesterdayDayDWD != null) {
             result.setCostDeviationRateTotal(yesterdayDayDWD.getCostDeviationRateTotal());
             result.setCostTotal(yesterdayDayDWD.getCostTotal());
             result.setCompensationAmountTotal(yesterdayDayDWD.getCompensationAmountTotal());
             result.setViewCountTotal(yesterdayDayDWD.getViewCountTotal());
-            result.setThousandDisplayPriceAll(yesterdayDayDWD.getThousandDisplayPriceAll());
             result.setAvgViewPerUserAll(yesterdayDayDWD.getAvgViewPerUserAll());
             result.setValidClickCountTotal(yesterdayDayDWD.getValidClickCountTotal());
-            result.setCtrAll(yesterdayDayDWD.getCtrAll());
-            result.setCpcAll(yesterdayDayDWD.getCpcAll());
             result.setValuableClickCountTotal(yesterdayDayDWD.getValuableClickCountTotal());
-            result.setValuableClickRateAll(yesterdayDayDWD.getValuableClickRateAll());
-            result.setValuableClickCostAll(yesterdayDayDWD.getValuableClickCostAll());
             result.setConversionsCountTotal(yesterdayDayDWD.getConversionsCountTotal());
-            result.setConversionsCostAll(yesterdayDayDWD.getConversionsCostAll());
-            result.setConversionsRateAll(yesterdayDayDWD.getConversionsRateAll());
             result.setDeepConversionsCountTotal(yesterdayDayDWD.getDeepConversionsCountTotal());
-            result.setDeepConversionsCostAll(yesterdayDayDWD.getDeepConversionsCostAll());
-            result.setDeepConversionsRateAll(yesterdayDayDWD.getDeepConversionsRateAll());
             result.setOrderCountTotal(yesterdayDayDWD.getOrderCountTotal());
             result.setFirstDayOrderCountTotal(yesterdayDayDWD.getFirstDayOrderCountTotal());
-            result.setWebOrderCostAll(yesterdayDayDWD.getWebOrderCostAll());
-            result.setOrderRateAll(yesterdayDayDWD.getOrderRateAll());
             result.setOrderAmountTotal(yesterdayDayDWD.getOrderAmountTotal());
             result.setFirstDayOrderAmountTotal(yesterdayDayDWD.getFirstDayOrderAmountTotal());
-            result.setOrderUnitPriceAll(yesterdayDayDWD.getOrderUnitPriceAll());
-            result.setOrderRoiAll(yesterdayDayDWD.getOrderRoiAll());
             result.setSignInCountTotal(yesterdayDayDWD.getSignInCountTotal());
             result.setScanFollowCountTotal(yesterdayDayDWD.getScanFollowCountTotal());
             result.setWechatAppRegisterUvTotal(yesterdayDayDWD.getWechatAppRegisterUvTotal());
-            result.setWechatMinigameRegisterCostAll(yesterdayDayDWD.getWechatMinigameRegisterCostAll());
-            result.setWechatMinigameRegisterRateAll(yesterdayDayDWD.getWechatMinigameRegisterRateAll());
-            result.setWechatMinigameArpuAll(yesterdayDayDWD.getWechatMinigameArpuAll());
             result.setWechatMinigameRetentionCountTotal(yesterdayDayDWD.getWechatMinigameRetentionCountTotal());
             result.setWechatMinigameCheckoutCountTotal(yesterdayDayDWD.getWechatMinigameCheckoutCountTotal());
             result.setWechatMinigameCheckoutAmountTotal(yesterdayDayDWD.getWechatMinigameCheckoutAmountTotal());
             result.setOfficialAccountFollowCountTotal(yesterdayDayDWD.getOfficialAccountFollowCountTotal());
-            result.setOfficialAccountFollowRateAll(yesterdayDayDWD.getOfficialAccountFollowRateAll());
             result.setOfficialAccountRegisterUserCountTotal(yesterdayDayDWD.getOfficialAccountRegisterUserCountTotal());
-            result.setOfficialAccountRegisterRateAll(yesterdayDayDWD.getOfficialAccountRegisterRateAll());
-            result.setOfficialAccountRegisterCostAll(yesterdayDayDWD.getOfficialAccountRegisterCostAll());
             result.setOfficialAccountRegisterAmountTotal(yesterdayDayDWD.getOfficialAccountRegisterAmountTotal());
-            result.setOfficialAccountRegisterRoiAll(yesterdayDayDWD.getOfficialAccountRegisterRoiAll());
             result.setOfficialAccountApplyCountTotal(yesterdayDayDWD.getOfficialAccountApplyCountTotal());
             result.setOfficialAccountApplyUserCountTotal(yesterdayDayDWD.getOfficialAccountApplyUserCountTotal());
-            result.setOfficialAccountApplyRateAll(yesterdayDayDWD.getOfficialAccountApplyRateAll());
-            result.setOfficialAccountApplyCostAll(yesterdayDayDWD.getOfficialAccountApplyCostAll());
             result.setOfficialAccountApplyAmountTotal(yesterdayDayDWD.getOfficialAccountApplyAmountTotal());
-            result.setOfficialAccountApplyRoiAll(yesterdayDayDWD.getOfficialAccountApplyRoiAll());
             result.setOfficialAccountOrderCountTotal(yesterdayDayDWD.getOfficialAccountOrderCountTotal());
             result.setOfficialAccountFirstDayOrderCountTotal(yesterdayDayDWD.getOfficialAccountFirstDayOrderCountTotal());
             result.setOfficialAccountOrderUserCountTotal(yesterdayDayDWD.getOfficialAccountOrderUserCountTotal());
-            result.setOfficialAccountOrderRateAll(yesterdayDayDWD.getOfficialAccountOrderRateAll());
-            result.setOfficialAccountOrderCostAll(yesterdayDayDWD.getOfficialAccountOrderCostAll());
             result.setOfficialAccountOrderAmountTotal(yesterdayDayDWD.getOfficialAccountOrderAmountTotal());
             result.setOfficialAccountFirstDayOrderAmountTotal(yesterdayDayDWD.getOfficialAccountFirstDayOrderAmountTotal());
-            result.setOfficialAccountOrderRoiAll(yesterdayDayDWD.getOfficialAccountOrderRoiAll());
             result.setOfficialAccountConsultCountTotal(yesterdayDayDWD.getOfficialAccountConsultCountTotal());
             result.setOfficialAccountReaderCountTotal(yesterdayDayDWD.getOfficialAccountReaderCountTotal());
             result.setOfficialAccountCreditApplyUserCountTotal(yesterdayDayDWD.getOfficialAccountCreditApplyUserCountTotal());

+ 38 - 18
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdMinuteDWDProcess.java

@@ -112,40 +112,60 @@ public class AdMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteODS,
             }
         }
 
-        // 重新聚合上小时的数据
-        if (hour > 0 && (beginDateTime.getMinute() == 0 || beginDateTime.getMinute() == 15 || beginDateTime.getMinute() == 55)) {
+        if (thisHourMinuteODS == null && hour > 0) {
+            int minHour = hour;
             AdDataOfMinuteODS lastHourMinuteODS = null;
             List<AdDataOfMinuteODS> lastHourMinuteODSList = new ArrayList<>(23);
             for (AdDataOfMinuteODS minuteODS : thisHourMinuteODSList) {
                 if (minuteODS.getHour() >= hour) {
                     continue;
                 }
+                minHour = Math.min(minHour, minuteODS.getHour());
                 lastHourMinuteODSList.add(minuteODS);
-                if (minuteODS.getHour() == hour - 1) {
+                if (lastHourMinuteODS == null) {
                     lastHourMinuteODS = minuteODS;
+                } else {
+                    lastHourMinuteODS = minuteODS.getHour() > lastHourMinuteODS.getHour() ? minuteODS : lastHourMinuteODS;
                 }
             }
-            if (thisHourMinuteODS == null && lastHourMinuteODS != null) {
-                LocalDateTime lastHourStatTime = LocalDateTime.of(beginDate, LocalTime.of(hour - 1, 55, 0));
-                lastHourMinuteODS.setStatTime(DateUtil.localDateTimeToMilli(lastHourStatTime));
-                AdStatOfMinuteDWD lastReduce = lastReduceState.get(lastHourStatTime.minusMinutes(5L).format(formatForLastReduceKey));
-                // 聚合当天的全部数据
-                AdStatOfMinuteDWD lastHourAdStat = AdStatOfMinuteDWD.reduce(yesterdayDayDWD, lastHourMinuteODSList, lastHourMinuteODS, lastReduce, now);
-                collector.collect(lastHourAdStat);
-
+            if (lastHourMinuteODS != null) {
+                if (lastHourMinuteODS.getHour() == hour - 1) {
+                    // 重新聚合上小时的数据
+                    if (beginDateTime.getMinute() == 0 || beginDateTime.getMinute() == 15 || beginDateTime.getMinute() == 55) {
+                        LocalDateTime lastHourStatTime = LocalDateTime.of(beginDate, LocalTime.of(hour - 1, 55, 0));
+                        lastHourMinuteODS.setStatTime(DateUtil.localDateTimeToMilli(lastHourStatTime));
+                        AdStatOfMinuteDWD lastReduce = lastReduceState.get(lastHourStatTime.minusMinutes(5L).format(formatForLastReduceKey));
+                        // 聚合当天的全部数据
+                        AdStatOfMinuteDWD lastHourAdStat = AdStatOfMinuteDWD.reduce(yesterdayDayDWD, lastHourMinuteODSList, lastHourMinuteODS, lastReduce, now);
+                        collector.collect(lastHourAdStat);
+                    }
+                }
                 // 造一条空数据
-                if (beginDateTime.getMinute() == 0) {
+                if (beginDateTime.getMinute() == 0 && minHour == lastHourMinuteODS.getHour()) {
                     LocalDateTime hourStatTime = LocalDateTime.of(beginDate, LocalTime.of(hour, 0, 0));
-                    AdStatOfMinuteDWD hourTemp = new AdStatOfMinuteDWD();
-                    BeanUtils.copyProperties(lastHourAdStat, hourTemp);
-                    hourTemp.setHour(hour);
-                    hourTemp.setStatTime(DateUtil.localDateTimeToMilli(hourStatTime));
-                    AdStatOfMinuteDWD.resetHourAndMinute(hourTemp);
-                    collector.collect(hourTemp);
+                    // 找今天聚合的最后一条历史数据
+                    AdStatOfMinuteDWD temp = null;
+                    for (int i = hour - 1; i >= 0; i++) {
+                        temp = lastReduceState.get(LocalDateTime.of(beginDate, LocalTime.of(i, 55, 0)).format(formatForLastReduceKey));
+                        if (temp != null) {
+                            break;
+                        }
+                    }
+                    if (temp != null) {
+                        AdStatOfMinuteDWD hourTemp = new AdStatOfMinuteDWD();
+                        BeanUtils.copyProperties(temp, hourTemp);
+                        hourTemp.setHour(hour);
+                        hourTemp.setStatTime(DateUtil.localDateTimeToMilli(hourStatTime));
+                        AdStatOfMinuteDWD.resetHourAndMinute(hourTemp);
+                        collector.collect(hourTemp);
+                    } else {
+                        log.error("造数据时没有找到历史数据, adId: {}, hour: {}, minHour: {}", adId, hour, minHour);
+                    }
                 }
             }
         }
 
+
         if (thisHourMinuteODS != null) {
             AdStatOfMinuteDWD lastReduce = lastReduceState.get(beginDateTime.minusMinutes(5L).format(formatForLastReduceKey));
             // 聚合当天的全部数据

+ 3 - 7
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/trigger/AdMinuteODSStreamTrigger.java

@@ -23,13 +23,9 @@ public class AdMinuteODSStreamTrigger extends Trigger<AdDataOfMinuteODS, TimeWin
         if (adDataOfMinuteODS.getHour() == beginDateTime.getHour()) {
             return TriggerResult.FIRE;
         }
-        // 回滚上小时的数据
-        if (beginDateTime.getHour() > 0 && (adDataOfMinuteODS.getHour() == beginDateTime.getHour() - 1)) {
-            int minute = beginDateTime.getMinute();
-            // 分别在 5分钟、20分钟、1小时后重新计算上小时的数据
-            if (minute == 0 || minute == 15 || minute == 55) {
-                return TriggerResult.FIRE;
-            }
+        // 用来做数据回滚及预生成数据
+        if(beginDateTime.getHour() > 0) {
+            return TriggerResult.FIRE;
         }
         return TriggerResult.CONTINUE;
     }