瀏覽代碼

增加补数据的逻辑

wcc 3 年之前
父節點
當前提交
87a42ae4cb

+ 1 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdHourStreamJob.java

@@ -96,7 +96,7 @@ public class AdHourStreamJob {
         // 分钟流-计算
         SingleOutputStreamOperator<AdStatOfMinuteDWD> adMinuteDWDStream = adMinuteODSStream
                 // 打水印,允许数据延迟 5分钟,同时指定时间流
-                .assignTimestampsAndWatermarks(WatermarkStrategy.<AdDataOfMinuteODS>forBoundedOutOfOrderness(Duration.ofDays(5L))
+                .assignTimestampsAndWatermarks(WatermarkStrategy.<AdDataOfMinuteODS>forBoundedOutOfOrderness(Duration.ofMinutes(5L))
                         .withTimestampAssigner((SerializableTimestampAssigner<AdDataOfMinuteODS>) (adODS, l) -> adODS.getStatTime()))
                 .keyBy(AdDataOfMinuteODS::getAdId)
                 // 开一个 5分钟的滚动窗口

+ 1 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdHourDWDProcess.java

@@ -79,7 +79,7 @@ public class AdHourDWDProcess extends KeyedProcessFunction<Long, AdDataOfHourODS
                 List<AdStatOfDayDWD> historyData = mapper.lastReduceResult(adId, null, DateUtil.formatLocalDate(LocalDate.now().minusDays(1L)), 60);
                 historyReduceState.clear();
                 historyReduceState.update(historyData);
-                lastQueryTimeState.update(lastQueryTime);
+                lastQueryTimeState.update(now);
             }
         }
         AdStatOfDayDWD yesterdayReduceData = null;

+ 1 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdMinuteDWDProcess.java

@@ -137,7 +137,7 @@ public class AdMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteODS,
                 if (beginDateTime.getMinute() == 0) {
                     LocalDateTime hourStatTime = LocalDateTime.of(beginDate, LocalTime.of(hour, 0, 0));
                     AdStatOfMinuteDWD hourTemp = new AdStatOfMinuteDWD();
-                    BeanUtils.copyProperties(lastReduce, hourTemp);
+                    BeanUtils.copyProperties(lastHourAdStat, hourTemp);
                     hourTemp.setHour(hour);
                     hourTemp.setStatTime(DateUtil.localDateTimeToMilli(hourStatTime));
                     AdStatOfMinuteDWD.resetHourAndMinute(hourTemp);

+ 3 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostHourProcess.java

@@ -84,7 +84,9 @@ public class CostHourProcess extends KeyedProcessFunction<Long, AdStatOfHourDWD,
             hourMapping.put(adStatOfHourDWD.getHour(), adStatOfHourDWD);
             historyReduceState.put(adStatOfHourDWD.getStatDay(), hourMapping);
         } else {
-            historyReduceState.get(adStatOfHourDWD.getStatDay()).put(adStatOfHourDWD.getHour(), adStatOfHourDWD);
+            Map<Integer, AdStatOfHourDWD> hourMapping = historyReduceState.get(adStatOfHourDWD.getStatDay());
+            hourMapping.put(adStatOfHourDWD.getHour(), adStatOfHourDWD);
+            historyReduceState.put(adStatOfHourDWD.getStatDay(), hourMapping);
         }
 
         CostHourDM costHourDM = dataChange(adStatOfHourDWD);