Quellcode durchsuchen

优化延迟消息的逻辑

wcc vor 3 Jahren
Ursprung
Commit
040ab61b06

+ 1 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdHourStreamJob.java

@@ -98,7 +98,7 @@ public class AdHourStreamJob {
         // 分钟流-计算
         SingleOutputStreamOperator<AdStatOfMinuteDWD> adMinuteDWDStream = adMinuteODSStream
                 // 打水印,允许数据延迟 5分钟,同时指定时间流
-                .assignTimestampsAndWatermarks(WatermarkStrategy.<AdDataOfMinuteODS>forBoundedOutOfOrderness(Duration.ofMinutes(5L))
+                .assignTimestampsAndWatermarks(WatermarkStrategy.<AdDataOfMinuteODS>forBoundedOutOfOrderness(Duration.ofDays(1L))
                         .withTimestampAssigner((SerializableTimestampAssigner<AdDataOfMinuteODS>) (adODS, l) -> adODS.getStatTime()))
                 .keyBy(AdDataOfMinuteODS::getAdId)
                 // 开一个 5分钟的滚动窗口

+ 2 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/trigger/AdMinuteODSStreamTrigger.java

@@ -21,7 +21,7 @@ public class AdMinuteODSStreamTrigger extends Trigger<AdDataOfMinuteODS, TimeWin
     public TriggerResult onElement(AdDataOfMinuteODS adDataOfMinuteODS, long time, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
         LocalDateTime beginDateTime = DateUtil.milliToLocalDateTime(timeWindow.getStart());
         if (adDataOfMinuteODS.getHour() == beginDateTime.getHour()) {
-            return TriggerResult.FIRE;
+            return TriggerResult.FIRE_AND_PURGE;
         }
         // 用来做数据回滚及预生成数据
         if(beginDateTime.getHour() > 0) {
@@ -44,7 +44,7 @@ public class AdMinuteODSStreamTrigger extends Trigger<AdDataOfMinuteODS, TimeWin
      */
     @Override
     public TriggerResult onEventTime(long time, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
-        return TriggerResult.FIRE_AND_PURGE;
+        return TriggerResult.PURGE;
     }
 
     @Override