Sfoglia il codice sorgente

完善 state的超时清理逻辑

root 3 anni fa
parent
commit
0ff554e4a3
26 ha cambiato i file con 150 aggiunte e 1058 eliminazioni
  1. 1 1
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdDayODSStreamJob.java
  2. 1 1
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdDayStreamJob.java
  3. 2 2
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdHourODSStreamJob.java
  4. 2 2
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdHourStreamJob.java
  5. 0 123
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/PlanDayStreamJob.java
  6. 0 115
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/PlanHourStreamJob.java
  7. 38 43
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/Test.java
  8. 9 2
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdDayDWDRollMonthProcess.java
  9. 5 1
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdDayDWDRollYearProcess.java
  10. 2 1
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdDayOnTimeStreamCompletionProcess.java
  11. 11 4
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdHourDWDProcess.java
  12. 5 1
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdHourOnTimeStreamCompletionProcess.java
  13. 12 4
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdMinuteDWDProcess.java
  14. 5 1
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostMinuteProcess.java
  15. 0 101
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/PlanDayDWDRollMonthProcess.java
  16. 0 29
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/PlanDayDWDRollYearProcess.java
  17. 0 63
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/PlanHourDTOStreamProcess.java
  18. 0 115
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/PlanHourDWDProcess.java
  19. 0 197
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/PlanHourStreamCompletionProcess.java
  20. 0 172
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/PlanMinuteDWDProcess.java
  21. 0 75
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/sink/PlanDayDWDToCkBatchSink.java
  22. 1 1
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/stream/BatchStream.java
  23. 1 1
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/stream/KeyedBatchStream.java
  24. 1 1
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/trigger/AdMinuteODSStreamTrigger.java
  25. 11 2
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/trigger/TimerCountTrigger.java
  26. 43 0
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/util/StateTtlUtil.java

+ 1 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdDayODSStreamJob.java

@@ -71,7 +71,7 @@ public class AdDayODSStreamJob {
                 .map(AdStatOfDayODSDTO::byJson).setParallelism(3);
 
         // 写入原始表
-        new KeyedBatchStream<>(adDayODSStream.map(AdStatOfDayODSDTO::getAdDataOfDayODS).setParallelism(3), AdDataOfDayODS::getStatDay, 3000L, Time.minutes(3L))
+        new KeyedBatchStream<>(adDayODSStream.map(AdStatOfDayODSDTO::getAdDataOfDayODS).setParallelism(3), AdDataOfDayODS::getStatDay, 3000L, Time.minutes(1L))
                 .toBatch()
                 .setParallelism(3)
                 .addSink(new TunnelBatchStreamSink<>(AdDataOfDayODS.class))

+ 1 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdDayStreamJob.java

@@ -117,7 +117,7 @@ public class AdDayStreamJob {
 
         DataStream<AdStatOfDayDWD> adDayStream = adDayDWDMonthStream.union(adDayDWDYearStream);
         // 写入 maxCompute
-        new KeyedBatchStream<>(adDayStream, AdStatOfDayDWD::getStatDay, 3000L, Time.minutes(3L))
+        new KeyedBatchStream<>(adDayStream, AdStatOfDayDWD::getStatDay, 3000L, Time.minutes(1L))
                 .toBatch()
                 .setParallelism(6)
                 .addSink(new OssBatchStreamSink<>(AdStatOfDayDWD.class, new OssBatchStreamSink.MonitoringGenerateOssObjectName("ad_stat_of_day_dwd")))

+ 2 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdHourODSStreamJob.java

@@ -85,7 +85,7 @@ public class AdHourODSStreamJob {
         // 分钟流
         DataStream<AdDataOfMinuteODS> adMinuteODSStream = adODSStream.getSideOutput(adMinuteStreamTag);
         // 分钟流-写入原始表
-        new KeyedBatchStream<>(adMinuteODSStream, AdDataOfMinuteODS::getStatDay, 6000L, Time.minutes(3L))
+        new KeyedBatchStream<>(adMinuteODSStream, AdDataOfMinuteODS::getStatDay, 6000L, Time.minutes(1L))
                 .toBatch()
                 .setParallelism(6)
                 .addSink(new TunnelBatchStreamSink<>(AdDataOfMinuteODS.class))
@@ -94,7 +94,7 @@ public class AdHourODSStreamJob {
         // 小时流
         DataStream<AdDataOfHourODS> adHourODSStream = adODSStream.getSideOutput(adHourStreamTag);
         // 小时流-写入原始表
-        new KeyedBatchStream<>(adHourODSStream, AdDataOfHourODS::getStatDay, 3000L, Time.minutes(3L))
+        new KeyedBatchStream<>(adHourODSStream, AdDataOfHourODS::getStatDay, 3000L, Time.minutes(1L))
                 .toBatch()
                 .setParallelism(3)
                 .addSink(new TunnelBatchStreamSink<>(AdDataOfHourODS.class))

+ 2 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdHourStreamJob.java

@@ -111,7 +111,7 @@ public class AdHourStreamJob {
                 .trigger(new AdMinuteODSStreamTrigger())
                 .process(new AdMinuteDWDProcess())
                 .setParallelism(8);
-        new KeyedBatchStream<>(adMinuteDWDStream, AdStatOfMinuteDWD::getStatDay, 5000L, Time.minutes(3L))
+        new KeyedBatchStream<>(adMinuteDWDStream, AdStatOfMinuteDWD::getStatDay, 5000L, Time.minutes(1L))
                 .toBatch()
                 .setParallelism(8)
                 .addSink(new OssBatchStreamSink<>(AdStatOfMinuteDWD.class, new OssBatchStreamSink.MonitoringGenerateOssObjectName("ad_stat_of_minute_dwd")))
@@ -139,7 +139,7 @@ public class AdHourStreamJob {
                         .process(new AdHourDWDProcess());
 
         // 小时流-写入maxCompute
-        new KeyedBatchStream<>(adHourDWDStream, AdStatOfHourDWD::getStatDay, 3000L, Time.minutes(3L))
+        new KeyedBatchStream<>(adHourDWDStream, AdStatOfHourDWD::getStatDay, 3000L, Time.minutes(1L))
                 .toBatch()
                 .setParallelism(8)
                 .addSink(new OssBatchStreamSink<>(AdStatOfHourDWD.class, new OssBatchStreamSink.MonitoringGenerateOssObjectName("ad_stat_of_hour_dwd")))

+ 0 - 123
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/PlanDayStreamJob.java

@@ -1,123 +0,0 @@
-package flink.zanxiangnet.ad.monitoring;
-
-import com.zanxiangnet.module.util.DateUtil;
-import flink.zanxiangnet.ad.monitoring.kafka.KafkaComponent;
-import flink.zanxiangnet.ad.monitoring.pojo.dto.AdStatOfDayODSDTO;
-import flink.zanxiangnet.ad.monitoring.pojo.entity.AdDataOfDayODS;
-import flink.zanxiangnet.ad.monitoring.pojo.entity.PlanStatOfDayDWD;
-import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
-import flink.zanxiangnet.ad.monitoring.process.PlanDayDWDRollMonthProcess;
-import flink.zanxiangnet.ad.monitoring.process.PlanDayDWDRollYearProcess;
-import flink.zanxiangnet.ad.monitoring.sink.OssBatchStreamSink;
-import flink.zanxiangnet.ad.monitoring.sink.PlanDayDWDToCkBatchSink;
-import flink.zanxiangnet.ad.monitoring.stream.BatchStream;
-import flink.zanxiangnet.ad.monitoring.stream.KeyedBatchStream;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.kafka.source.KafkaSource;
-import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
-import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.environment.CheckpointConfig;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.ProcessFunction;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.OutputTag;
-
-import java.time.LocalDate;
-import java.util.Properties;
-
-@Slf4j
-public class PlanDayStreamJob {
-    public static void main(String[] args) throws Exception {
-        boolean isTest = false;
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-        // 加载配置文件到 flink的全局配置中
-        Properties props = new Properties();
-        props.load(AdHourStreamJob.class.getResourceAsStream(isTest ? "/application.test.properties" : "/application.properties"));
-        Configuration configuration = new Configuration();
-        props.stringPropertyNames().forEach(key -> {
-            String value = props.getProperty(key);
-            configuration.setString(key.trim(), StringUtils.isBlank(value) ? "" : value.trim());
-        });
-        env.getConfig().setGlobalJobParameters(configuration);
-
-        // checkpoint配置
-        env.enableCheckpointing(3 * 60 * 1000L, CheckpointingMode.EXACTLY_ONCE);
-        // checkpoint执行超时时间,超时则 checkpoint失败
-        env.getCheckpointConfig().setCheckpointTimeout(5 * 60 * 1000L);
-        // checkpoint执行最小间隔时间
-        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(60 * 1000L);
-        // 允许并行执行 checkpoint个数,当指定了 minPauseBetweenCheckpoints时,其值无效(就是 1)
-        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
-        // 开启 checkpoints的外部持久化,但是在job失败的时候不会自动清理,需要自己手工清理state。
-        // ExternalizedCheckpointCleanup用于指定当job canceled的时候外部的 checkpoint该如何清理
-        // DELETE_ON_CANCELLATION: 在job canceled的时候会自动删除外部 state,但是如果是FAILED的状态则会保留
-        // RETAIN_ON_CANCELLATION:在job canceled的时候保留外部 state
-        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
-        // 大概是允许 checkpoint失败几次,默认 0
-        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0);
-        env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
-        if (StringUtils.isNotBlank(props.getProperty(ApplicationProperties.FLINK_CHECKPOINT_SAVEPOINT))) {
-            env.getCheckpointConfig().setCheckpointStorage(props.getProperty(ApplicationProperties.FLINK_CHECKPOINT_SAVEPOINT));
-        }
-
-        KafkaSource<String> adStreamOfDaySource = KafkaComponent.buildKafkaSource(isTest, props, KafkaComponent.KafkaTopic.adDayTopic, KafkaComponent.KafkaTopic.KafkaGroupId.planDayConsumerGroup);
-        DataStreamSource<String> adStreamOfDayIn = env.fromSource(adStreamOfDaySource, WatermarkStrategy.noWatermarks(), "planDaySource_kafka");
-
-        // 广告日数据。往前回滚 10天
-        final OutputTag<AdDataOfDayODS> adDayStreamRollDayTag = new OutputTag<AdDataOfDayODS>("adDayStreamRollDayTag") {
-        };
-        // 广告日数据。往前回滚 1年
-        final OutputTag<AdDataOfDayODS> adDayStreamRollYearTag = new OutputTag<AdDataOfDayODS>("adDayStreamRollYearTag") {
-        };
-
-        // 天数据
-        SingleOutputStreamOperator<AdStatOfDayODSDTO> adDayODSStream = adStreamOfDayIn.filter(StringUtils::isNotBlank)
-                .map(AdStatOfDayODSDTO::byJson);
-
-        // 天数据-拆分流
-        SingleOutputStreamOperator<AdStatOfDayODSDTO> adDayODSStreamSplit = adDayODSStream.process(new ProcessFunction<AdStatOfDayODSDTO, AdStatOfDayODSDTO>() {
-            @Override
-            public void processElement(AdStatOfDayODSDTO adDataDTO, ProcessFunction<AdStatOfDayODSDTO, AdStatOfDayODSDTO>.Context context, Collector<AdStatOfDayODSDTO> collector) throws Exception {
-                LocalDate startDate = adDataDTO.getStartDate();
-                LocalDate endDate = adDataDTO.getEndDate();
-                AdDataOfDayODS adODS = adDataDTO.getAdDataOfDayODS();
-                if (DateUtil.intervalOfDays(startDate, endDate) > 31L) {
-                    // 拉取时间间隔超过 1个月,账号回滚 365天的数据
-                    context.output(adDayStreamRollYearTag, adODS);
-                } else {
-                    // 每日往前回滚 10天的数据
-                    context.output(adDayStreamRollDayTag, adODS);
-                }
-            }
-        });
-
-        // 每日回滚 30天
-        SingleOutputStreamOperator<PlanStatOfDayDWD> planDayDWDMonthStream = adDayODSStreamSplit.getSideOutput(adDayStreamRollDayTag)
-                .keyBy(AdDataOfDayODS::getCampaignId)
-                .process(new PlanDayDWDRollMonthProcess());
-
-        // 往前回滚 365天
-        SingleOutputStreamOperator<PlanStatOfDayDWD> planDayDWDYearStream = adDayODSStreamSplit.getSideOutput(adDayStreamRollYearTag)
-                .keyBy(AdDataOfDayODS::getCampaignId)
-                .process(new PlanDayDWDRollYearProcess());
-
-        DataStream<PlanStatOfDayDWD> planDayDWDStream = planDayDWDMonthStream.union(planDayDWDYearStream);
-        // 写入 maxCompute
-        new KeyedBatchStream<>(planDayDWDStream, PlanStatOfDayDWD::getStatDay, 4000L, Time.minutes(1L))
-                .toBatch()
-                .addSink(new OssBatchStreamSink<>(PlanStatOfDayDWD.class, new OssBatchStreamSink.MonitoringGenerateOssObjectName("plan_stat_of_day_dwd")))
-                .name("sink_plan_year_dwd");
-        // 写入 ck
-        new BatchStream<>(planDayDWDStream, 1000L, Time.minutes(1L)).toBatch().addSink(new PlanDayDWDToCkBatchSink());
-
-        env.execute(isTest ? "plan_day_stream_job_test" : "plan_day_stream_job");
-    }
-}

+ 0 - 115
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/PlanHourStreamJob.java

@@ -1,115 +0,0 @@
-package flink.zanxiangnet.ad.monitoring;
-
-import flink.zanxiangnet.ad.monitoring.pojo.entity.*;
-import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
-import flink.zanxiangnet.ad.monitoring.process.PlanHourDTOStreamProcess;
-import flink.zanxiangnet.ad.monitoring.process.PlanHourDWDProcess;
-import flink.zanxiangnet.ad.monitoring.process.PlanHourStreamCompletionProcess;
-import flink.zanxiangnet.ad.monitoring.process.PlanMinuteDWDProcess;
-import flink.zanxiangnet.ad.monitoring.sink.OssBatchStreamSink;
-import flink.zanxiangnet.ad.monitoring.stream.KeyedBatchStream;
-import flink.zanxiangnet.ad.monitoring.trigger.PlanMinuteODSStreamTrigger;
-import flink.zanxiangnet.ad.monitoring.kafka.KafkaComponent;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.kafka.source.KafkaSource;
-import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.environment.CheckpointConfig;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.util.OutputTag;
-
-import java.time.Duration;
-import java.util.*;
-
-public class PlanHourStreamJob {
-
-    public static void main(String[] args) throws Exception {
-        boolean isTest = false;
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-        // 加载配置文件到 flink的全局配置中
-        Properties props = new Properties();
-        props.load(AdHourStreamJob.class.getResourceAsStream(isTest ? "/application.test.properties" : "/application.properties"));
-        Configuration configuration = new Configuration();
-        props.stringPropertyNames().forEach(key -> {
-            String value = props.getProperty(key);
-            configuration.setString(key.trim(), StringUtils.isBlank(value) ? "" : value.trim());
-        });
-        env.getConfig().setGlobalJobParameters(configuration);
-
-        // checkpoint配置
-        env.enableCheckpointing(2 * 60 * 1000L, CheckpointingMode.EXACTLY_ONCE);
-        // checkpoint执行超时时间,超时则 checkpoint失败
-        env.getCheckpointConfig().setCheckpointTimeout(5 * 60 * 1000L);
-        // checkpoint执行最小间隔时间
-        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(60 * 1000L);
-        // 允许并行执行 checkpoint个数,当指定了 minPauseBetweenCheckpoints时,其值无效(就是 1)
-        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
-        // 开启 checkpoints的外部持久化,但是在job失败的时候不会自动清理,需要自己手工清理state。
-        // ExternalizedCheckpointCleanup用于指定当job canceled的时候外部的 checkpoint该如何清理
-        // DELETE_ON_CANCELLATION: 在job canceled的时候会自动删除外部 state,但是如果是FAILED的状态则会保留
-        // RETAIN_ON_CANCELLATION:在job canceled的时候保留外部 state
-        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
-        // 大概是允许 checkpoint失败几次,默认 0
-        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0);
-        env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
-        if (StringUtils.isNotBlank(props.getProperty(ApplicationProperties.FLINK_CHECKPOINT_SAVEPOINT))) {
-            env.getCheckpointConfig().setCheckpointStorage(props.getProperty(ApplicationProperties.FLINK_CHECKPOINT_SAVEPOINT));
-        }
-
-        KafkaSource<String> adStreamOfMinuteSource = KafkaComponent.buildKafkaSource(isTest, props, KafkaComponent.KafkaTopic.adHourTopic, KafkaComponent.KafkaTopic.KafkaGroupId.planHourConsumerGroup);
-        DataStreamSource<String> adStreamOfMinuteIn = env.fromSource(adStreamOfMinuteSource, WatermarkStrategy.noWatermarks(), "planHourSource_kafka");
-
-        // 广告分钟数据(前 5分钟的广告消耗数据)
-        final OutputTag<AdDataOfMinuteODS> adMinuteStreamTag = new OutputTag<AdDataOfMinuteODS>("adMinuteStream") {
-        };
-        // 广告小时数据(往前回滚 10天)
-        final OutputTag<AdDataOfHourODS> adHourStreamTag = new OutputTag<AdDataOfHourODS>("adHourStream") {
-        };
-        // 计划小时数据(从分钟数据流切出来的整点数据)
-        final OutputTag<PlanStatOfHourDWD> planHourFromMinuteStreamTag = new OutputTag<PlanStatOfHourDWD>("planHourFromMinuteStream") {
-        };
-
-        // 对流进行映射,拆分(实时的分钟流和回滚的小时流)
-        SingleOutputStreamOperator<AdDataOfMinuteODS> adODSStream = adStreamOfMinuteIn.filter(StringUtils::isNotBlank)
-                .process(new PlanHourDTOStreamProcess(adMinuteStreamTag, adHourStreamTag));
-
-        // 分钟流-计算
-        SingleOutputStreamOperator<PlanStatOfMinuteDWD> planMinuteDWDStream = adODSStream.getSideOutput(adMinuteStreamTag)
-                // 打水印,允许数据延迟 6分钟,同时指定时间流
-                .assignTimestampsAndWatermarks(WatermarkStrategy.<AdDataOfMinuteODS>forBoundedOutOfOrderness(Duration.ofMinutes(6L))
-                        .withTimestampAssigner((SerializableTimestampAssigner<AdDataOfMinuteODS>) (adODS, l) -> adODS.getStatTime()))
-                .keyBy(AdDataOfMinuteODS::getCampaignId)
-                // 开一个 5分钟的滚动窗口
-                .window(TumblingEventTimeWindows.of(Time.minutes(5L)))
-                .trigger(new PlanMinuteODSStreamTrigger())
-                .process(new PlanMinuteDWDProcess(planHourFromMinuteStreamTag));
-        new KeyedBatchStream<>(planMinuteDWDStream, PlanStatOfMinuteDWD::getStatDay, 4000L, Time.minutes(1L))
-                .toBatch()
-                .addSink(new OssBatchStreamSink<>(PlanStatOfMinuteDWD.class, new OssBatchStreamSink.MonitoringGenerateOssObjectName("plan_stat_of_minute_dwd")))
-                .name("sink_plan_minute_dwd");
-
-        // 小时流
-        SingleOutputStreamOperator<PlanStatOfHourDWD> planHourDWDStream = adODSStream.getSideOutput(adHourStreamTag).keyBy(AdDataOfHourODS::getCampaignId)
-                .process(new PlanHourDWDProcess());
-        DataStream<PlanStatOfHourDWD> planHourDWDAllStream = planMinuteDWDStream.getSideOutput(planHourFromMinuteStreamTag)
-                .keyBy(PlanStatOfHourDWD::getCampaignId)
-                .process(new PlanHourStreamCompletionProcess())
-                .union(planHourDWDStream);
-        new KeyedBatchStream<>(planHourDWDAllStream, PlanStatOfHourDWD::getStatDay, 4000L, Time.minutes(1L))
-                .toBatch()
-                .addSink(new OssBatchStreamSink<>(PlanStatOfHourDWD.class, new OssBatchStreamSink.MonitoringGenerateOssObjectName("plan_stat_of_hour_dwd")))
-                .name("sink_plan_hour_dwd");
-
-        env.execute(isTest ? "plan_hour_stream_job_test" : "plan_hour_stream_job");
-    }
-}

+ 38 - 43
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/Test.java

@@ -9,34 +9,24 @@ import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.state.*;
-import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.util.Collector;
 
 import java.time.Duration;
-import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
-import java.util.Properties;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 @Slf4j
 public class Test {
@@ -59,21 +49,23 @@ public class Test {
         SingleOutputStreamOperator<Pojo> pojoStream = source.assignTimestampsAndWatermarks(WatermarkStrategy.<Pojo>forBoundedOutOfOrderness(Duration.ofHours(3))
                 .withTimestampAssigner((SerializableTimestampAssigner<Pojo>) (pojo, l) -> pojo.getCreateTime())
         );
-        SingleOutputStreamOperator<Pojo> temp = pojoStream.keyBy(Pojo::getUserId).window(TumblingEventTimeWindows.of(Time.hours(24))).process(new ProcessWindowFunction<Pojo, Pojo, Integer, TimeWindow>() {
-            @Override
-            public void process(Integer integer, ProcessWindowFunction<Pojo, Pojo, Integer, TimeWindow>.Context context, Iterable<Pojo> elements, Collector<Pojo> out) throws Exception {
-                System.out.println("2222begin: " + DateUtil.formatLocalDateTime(DateUtil.milliToLocalDateTime(context.window().getStart())) + " | end: " + DateUtil.formatLocalDateTime(DateUtil.milliToLocalDateTime(context.window().getEnd())));
-                for(Pojo pojo : elements) {
-                    out.collect(pojo);
-                }
-            }
-        });
-        new KeyedBatchStream<>(pojoStream, Pojo::getUserId, 10L, Time.seconds(10))
+        /*SingleOutputStreamOperator<Pojo> temp = pojoStream.keyBy(Pojo::getUserId)
+                .window(TumblingEventTimeWindows.of(Time.hours(2)))
+                .process(new ProcessWindowFunction<Pojo, Pojo, Integer, TimeWindow>() {
+                    @Override
+                    public void process(Integer integer, ProcessWindowFunction<Pojo, Pojo, Integer, TimeWindow>.Context context, Iterable<Pojo> elements, Collector<Pojo> out) throws Exception {
+                        System.out.println("2222begin: " + DateUtil.formatLocalDateTime(DateUtil.milliToLocalDateTime(context.window().getStart())) + " | end: " + DateUtil.formatLocalDateTime(DateUtil.milliToLocalDateTime(context.window().getEnd())));
+                        for (Pojo pojo : elements) {
+                            out.collect(pojo);
+                        }
+                    }
+                });*/
+        new KeyedBatchStream<>(pojoStream, Pojo::getUserId, 10L, Time.seconds(16))
                 .toBatch()
                 .process(new ProcessFunction<List<Pojo>, String>() {
                     @Override
                     public void processElement(List<Pojo> value, ProcessFunction<List<Pojo>, String>.Context ctx, Collector<String> out) throws Exception {
-                        out.collect("收到 " + value.size() + "个元素!!!");
+                        out.collect("收到 " + value.size() + "个元素!" + value.stream().map(Pojo::getIndex).collect(Collectors.toList()));
                     }
                 }).print();
         /*pojoStream.keyBy(Pojo::getUserId)
@@ -262,34 +254,37 @@ public class Test {
         public void run(SourceContext<Pojo> sourceContext) {
             while (isRun) {
                 try {
-                    long user1Index = index1.incrementAndGet();
-                    Pojo pojo = Pojo.builder()
-                            .userId(1)
-                            .index(user1Index)
-                            .createTime(BEGIN + ((user1Index - 1) * 60 * 60 * 1000))
-                            .build();
-                    if (user1Index % 24 == 0) {
-                        // 模拟数据延迟,每天 24点的数据延迟 25秒
-                        threadPool.execute(() -> {
-                            try {
-                                Thread.sleep(2000);
-                            } catch (InterruptedException e) {
-                                e.printStackTrace();
-                            }
-                            log.error("延迟1发送:{}", pojo);
+                    for (int i = 0; i < 24; i++) {
+                        long user1Index = index1.incrementAndGet();
+                        Pojo pojo = Pojo.builder()
+                                .userId(1)
+                                .index(user1Index)
+                                .createTime(BEGIN + ((user1Index - 1) * 60 * 60 * 1000))
+                                .build();
+                        if (user1Index % 12 == 0) {
+                            // 模拟数据延迟,每天 24点的数据延迟 2秒
+                            threadPool.execute(() -> {
+                                try {
+                                    Thread.sleep(2000);
+                                } catch (InterruptedException e) {
+                                    e.printStackTrace();
+                                }
+                                log.error("延迟1发送:{}", pojo);
+                                sourceContext.collect(pojo);
+                            });
+                        } else {
+                            // log.error("1发送:{}", pojo);
                             sourceContext.collect(pojo);
-                        });
-                    } else {
-                        // log.error("1发送:{}", pojo);
-                        sourceContext.collect(pojo);
+                        }
                     }
+
                     /*long user2Index = index2.incrementAndGet();
                     Pojo pojo2 = Pojo.builder()
                             .userId(2)
                             .index(user2Index)
                             .createTime(BEGIN + ((user2Index - 1) * 60 * 60 * 1000))
                             .build();
-                    if (user2Index % 24 == 0) {
+                    if (user2Index % 12 == 0) {
                         // 模拟数据延迟,每天 24点的数据延迟 25秒
                         threadPool.execute(() -> {
                             try {
@@ -304,7 +299,7 @@ public class Test {
                         // log.error("2发送:{}", pojo2);
                         sourceContext.collect(pojo2);
                     }*/
-                    Thread.sleep(1000);
+                    Thread.sleep(24 * 1000);
                 } catch (Exception e) {
                     e.printStackTrace();
                 }

+ 9 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdDayDWDRollMonthProcess.java

@@ -6,11 +6,13 @@ import flink.zanxiangnet.ad.monitoring.pojo.entity.AdDataOfDayODS;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfDayDWD;
 import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
 import com.zanxiangnet.module.util.DateUtil;
+import flink.zanxiangnet.ad.monitoring.util.StateTtlUtil;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 import org.apache.flink.util.Collector;
@@ -60,8 +62,13 @@ public class AdDayDWDRollMonthProcess extends KeyedProcessFunction<Long, AdDataO
         configuration.addMapper(AdStatOfDayDWDMapper.class);
         sqlSessionFactory = new SqlSessionFactoryBuilder().build(configuration);
 
-        lastQueryDayState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastQueryDayState", String.class));
-        historyReduceState = getRuntimeContext().getMapState(new MapStateDescriptor<>("historyReduceState", String.class, AdStatOfDayDWD.class));
+        ValueStateDescriptor<String> lastQueryDayStateDescriptor = new ValueStateDescriptor<>("lastQueryDayState", String.class);
+        lastQueryDayStateDescriptor.enableTimeToLive(StateTtlUtil.rocketDBTtl(Time.hours(25)));
+        lastQueryDayState = getRuntimeContext().getState(lastQueryDayStateDescriptor);
+
+        MapStateDescriptor<String, AdStatOfDayDWD> historyReduceStateDescriptor = new MapStateDescriptor<>("historyReduceState", String.class, AdStatOfDayDWD.class);
+        historyReduceStateDescriptor.enableTimeToLive(StateTtlUtil.rocketDBTtl(Time.hours(25)));
+        historyReduceState = getRuntimeContext().getMapState(historyReduceStateDescriptor);
     }
 
     @Override

+ 5 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdDayDWDRollYearProcess.java

@@ -2,9 +2,11 @@ package flink.zanxiangnet.ad.monitoring.process;
 
 import flink.zanxiangnet.ad.monitoring.pojo.entity.AdDataOfDayODS;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfDayDWD;
+import flink.zanxiangnet.ad.monitoring.util.StateTtlUtil;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 import org.apache.flink.util.Collector;
@@ -15,7 +17,9 @@ public class AdDayDWDRollYearProcess extends KeyedProcessFunction<Long, AdDataOf
     private ValueState<AdStatOfDayDWD> lastReduceState;
 
     public void open(Configuration conf) {
-        lastReduceState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastReduceState", AdStatOfDayDWD.class));
+        ValueStateDescriptor<AdStatOfDayDWD> lastReduceStateDescriptor = new ValueStateDescriptor<>("lastReduceState", AdStatOfDayDWD.class);
+        lastReduceStateDescriptor.enableTimeToLive(StateTtlUtil.rocketDBTtl(Time.hours(6)));
+        lastReduceState = getRuntimeContext().getState(lastReduceStateDescriptor);
     }
 
     @Override

+ 2 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdDayOnTimeStreamCompletionProcess.java

@@ -59,7 +59,8 @@ public class AdDayOnTimeStreamCompletionProcess extends KeyedProcessFunction<Lon
         configuration.addMapper(AdStatOfDayDWDMapper.class);
         sqlSessionFactory = new SqlSessionFactoryBuilder().build(configuration);
 
-        lastQueryDayState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastQueryDayState", String.class));
+        ValueStateDescriptor<String> lastQueryDayStateDescriptor = new ValueStateDescriptor<>("lastQueryDayState", String.class);
+        lastQueryDayState = getRuntimeContext().getState(lastQueryDayStateDescriptor);
     }
 
     @Override

+ 11 - 4
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdHourDWDProcess.java

@@ -7,8 +7,10 @@ import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfDayDWD;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfHourDWD;
 import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
 import com.zanxiangnet.module.util.DateUtil;
+import flink.zanxiangnet.ad.monitoring.util.StateTtlUtil;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.api.common.state.*;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
@@ -43,7 +45,6 @@ public class AdHourDWDProcess extends KeyedProcessFunction<Long, AdDataOfHourODS
                 .getGlobalJobParameters()
                 .toMap();
 
-
         Properties mysqlProps = new Properties();
         mysqlProps.setProperty(ApplicationProperties.MYSQL_DRIVER_CLASS_NAME, params.get(ApplicationProperties.MYSQL_DRIVER_CLASS_NAME));
         mysqlProps.setProperty(ApplicationProperties.MYSQL_URL, params.get(ApplicationProperties.MYSQL_URL));
@@ -63,9 +64,15 @@ public class AdHourDWDProcess extends KeyedProcessFunction<Long, AdDataOfHourODS
         configuration.addMapper(AdStatOfDayDWDMapper.class);
         sqlSessionFactory = new SqlSessionFactoryBuilder().build(configuration);
 
-        lastQueryTimeState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastQueryTimeState", Types.LONG));
-        historyReduceState = getRuntimeContext().getListState(new ListStateDescriptor<>("historyReduceState", Types.POJO(AdStatOfDayDWD.class)));
-        historyState = getRuntimeContext().getMapState(new MapStateDescriptor<>("historyState", Types.STRING, Types.MAP(Types.INT, Types.POJO(AdDataOfHourODS.class))));
+        ValueStateDescriptor<Long> lastQueryTimeStateDescriptor = new ValueStateDescriptor<>("lastQueryTimeState", Types.LONG);
+        lastQueryTimeStateDescriptor.enableTimeToLive(StateTtlUtil.rocketDBTtl(Time.hours(1)));
+        lastQueryTimeState = getRuntimeContext().getState(lastQueryTimeStateDescriptor);
+        ListStateDescriptor<AdStatOfDayDWD> historyReduceStateDescriptor = new ListStateDescriptor<>("historyReduceState", Types.POJO(AdStatOfDayDWD.class));
+        historyReduceStateDescriptor.enableTimeToLive(StateTtlUtil.rocketDBTtl(Time.hours(1)));
+        historyReduceState = getRuntimeContext().getListState(historyReduceStateDescriptor);
+        MapStateDescriptor<String, Map<Integer, AdDataOfHourODS>> historyStateDescriptor = new MapStateDescriptor<>("historyState", Types.STRING, Types.MAP(Types.INT, Types.POJO(AdDataOfHourODS.class)));
+        historyStateDescriptor.enableTimeToLive(StateTtlUtil.rocketDBTtl(Time.hours(24)));
+        historyState = getRuntimeContext().getMapState(historyStateDescriptor);
     }
 
     @Override

+ 5 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdHourOnTimeStreamCompletionProcess.java

@@ -2,9 +2,11 @@ package flink.zanxiangnet.ad.monitoring.process;
 
 import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfHourDWD;
 import com.zanxiangnet.module.util.DateUtil;
+import flink.zanxiangnet.ad.monitoring.util.StateTtlUtil;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
@@ -23,7 +25,9 @@ public class AdHourOnTimeStreamCompletionProcess extends KeyedProcessFunction<Lo
 
     @Override
     public void open(Configuration conf) {
-        lastReduceState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastReduceState", Types.POJO(AdStatOfHourDWD.class)));
+        ValueStateDescriptor<AdStatOfHourDWD> lastReduceStateDescriptor = new ValueStateDescriptor<>("lastReduceState", Types.POJO(AdStatOfHourDWD.class));
+        lastReduceStateDescriptor.enableTimeToLive(StateTtlUtil.rocketDBTtl(Time.hours(24)));
+        lastReduceState = getRuntimeContext().getState(lastReduceStateDescriptor);
     }
 
     @Override

+ 12 - 4
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdMinuteDWDProcess.java

@@ -5,8 +5,10 @@ import flink.zanxiangnet.ad.monitoring.dao.mapper.AdStatOfDayDWDMapper;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.*;
 import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
 import com.zanxiangnet.module.util.DateUtil;
+import flink.zanxiangnet.ad.monitoring.util.StateTtlUtil;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.api.common.state.*;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
@@ -68,9 +70,15 @@ public class AdMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteODS,
         configuration.addMapper(AdStatOfDayDWDMapper.class);
         sqlSessionFactory = new SqlSessionFactoryBuilder().build(configuration);
 
-        historyDayState = getRuntimeContext().getListState(new ListStateDescriptor<>("historyDayState", Types.POJO(AdStatOfDayDWD.class)));
-        lastQueryTimeState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastQueryTimeState", Types.LONG));
-        lastReduceState = getRuntimeContext().getMapState(new MapStateDescriptor<>("lastReduceState", Types.STRING, Types.POJO(AdStatOfMinuteDWD.class)));
+        ListStateDescriptor<AdStatOfDayDWD> historyDayStateDescriptor = new ListStateDescriptor<>("historyDayState", Types.POJO(AdStatOfDayDWD.class));
+        historyDayStateDescriptor.enableTimeToLive(StateTtlUtil.rocketDBTtl(Time.hours(2)));
+        historyDayState = getRuntimeContext().getListState(historyDayStateDescriptor);
+        ValueStateDescriptor<Long> lastQueryTimeStateDescriptor = new ValueStateDescriptor<>("lastQueryTimeState", Types.LONG);
+        lastQueryTimeStateDescriptor.enableTimeToLive(StateTtlUtil.rocketDBTtl(Time.hours(2)));
+        lastQueryTimeState = getRuntimeContext().getState(lastQueryTimeStateDescriptor);
+        MapStateDescriptor<String, AdStatOfMinuteDWD> lastReduceStateDescriptor = new MapStateDescriptor<>("lastReduceState", Types.STRING, Types.POJO(AdStatOfMinuteDWD.class));
+        lastReduceStateDescriptor.enableTimeToLive(StateTtlUtil.rocketDBTtl(Time.hours(25)));
+        lastReduceState = getRuntimeContext().getMapState(lastReduceStateDescriptor);
     }
 
     @Override
@@ -95,7 +103,7 @@ public class AdMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteODS,
 
         // 每 2小时更新下之前的历史数据
         Long lastQueryTime = lastQueryTimeState.value();
-        if (lastQueryTime == null || (beginTime - lastQueryTime > 2 * 60 * 60 * 1000L)) {
+        if (lastQueryTime == null || (beginTime - lastQueryTime) > 2 * 60 * 60 * 1000L) {
             try (SqlSession session = sqlSessionFactory.openSession()) {
                 AdStatOfDayDWDMapper mapper = session.getMapper(AdStatOfDayDWDMapper.class);
                 List<AdStatOfDayDWD> historyData = mapper.lastReduceResult(adId, null, DateUtil.formatLocalDate(beginDate.minusDays(1L)), 6);

+ 5 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostMinuteProcess.java

@@ -3,8 +3,10 @@ package flink.zanxiangnet.ad.monitoring.process;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.*;
 import com.zanxiangnet.module.util.DateUtil;
 import flink.zanxiangnet.ad.monitoring.util.NumberUtil;
+import flink.zanxiangnet.ad.monitoring.util.StateTtlUtil;
 import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
@@ -21,7 +23,9 @@ public class CostMinuteProcess extends KeyedProcessFunction<Long, AdStatOfMinute
 
     @Override
     public void open(Configuration conf) throws Exception {
-        historyReduceState = getRuntimeContext().getMapState(new MapStateDescriptor<>("historyReduceState", Types.STRING, Types.MAP(Types.INT, Types.POJO(AdStatOfMinuteDWD.class))));
+        MapStateDescriptor<String, Map<Integer, AdStatOfMinuteDWD>> historyReduceStateDescriptor = new MapStateDescriptor<>("historyReduceState", Types.STRING, Types.MAP(Types.INT, Types.POJO(AdStatOfMinuteDWD.class)));
+        historyReduceStateDescriptor.enableTimeToLive(StateTtlUtil.rocketDBTtl(Time.hours(50)));
+        historyReduceState = getRuntimeContext().getMapState(historyReduceStateDescriptor);
     }
 
     @Override

+ 0 - 101
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/PlanDayDWDRollMonthProcess.java

@@ -1,101 +0,0 @@
-package flink.zanxiangnet.ad.monitoring.process;
-
-import flink.zanxiangnet.ad.monitoring.config.ClickhouseDataSourceFactory;
-import flink.zanxiangnet.ad.monitoring.dao.mapper.PlanStatOfDayDWDMapper;
-import flink.zanxiangnet.ad.monitoring.pojo.entity.AdDataOfDayODS;
-import flink.zanxiangnet.ad.monitoring.pojo.entity.PlanStatOfDayDWD;
-import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
-import com.zanxiangnet.module.util.DateUtil;
-import flink.zanxiangnet.ad.monitoring.util.PlanUtil;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
-import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
-import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
-import org.apache.flink.util.Collector;
-import org.apache.ibatis.datasource.DataSourceFactory;
-import org.apache.ibatis.mapping.Environment;
-import org.apache.ibatis.session.SqlSession;
-import org.apache.ibatis.session.SqlSessionFactory;
-import org.apache.ibatis.session.SqlSessionFactoryBuilder;
-import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory;
-
-import java.time.LocalDate;
-import java.util.Map;
-import java.util.Properties;
-import java.util.stream.Collectors;
-
-@Slf4j
-public class PlanDayDWDRollMonthProcess extends KeyedProcessFunction<Long, AdDataOfDayODS, PlanStatOfDayDWD> {
-    private SqlSessionFactory sqlSessionFactory;
-    // 上次查询的时间
-    private ValueState<String> lastQueryDayState;
-    // 之前聚合的昨天的数据
-    private MapState<String, PlanStatOfDayDWD> historyReduceState;
-
-    @Override
-    public void open(Configuration conf) throws Exception {
-        Map<String, String> params = getRuntimeContext()
-                .getExecutionConfig()
-                .getGlobalJobParameters()
-                .toMap();
-
-        Properties ckProps = new Properties();
-        ckProps.setProperty(ApplicationProperties.CK_URL, params.get(ApplicationProperties.CK_URL));
-        ckProps.setProperty(ApplicationProperties.CK_USERNAME, params.get(ApplicationProperties.CK_USERNAME));
-        ckProps.setProperty(ApplicationProperties.CK_PASSWORD, params.get(ApplicationProperties.CK_PASSWORD));
-
-        DataSourceFactory dataSourceFactory = new ClickhouseDataSourceFactory();
-        dataSourceFactory.setProperties(ckProps);
-        Environment environment = new Environment("clickhouse", new JdbcTransactionFactory(), dataSourceFactory.getDataSource());
-        org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration(environment);
-        // 开启驼峰规则
-        configuration.setMapUnderscoreToCamelCase(true);
-        configuration.getTypeAliasRegistry().registerAlias(PlanStatOfDayDWD.class);
-        // addMapper一定要放到 alias的后面!!!!!
-        configuration.addMapper(PlanStatOfDayDWDMapper.class);
-        sqlSessionFactory = new SqlSessionFactoryBuilder().build(configuration);
-
-        lastQueryDayState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastQueryDayState", String.class));
-        historyReduceState = getRuntimeContext().getMapState(new MapStateDescriptor<>("historyReduceState", String.class, PlanStatOfDayDWD.class));
-    }
-
-    @Override
-    public void processElement(AdDataOfDayODS element, KeyedProcessFunction<Long, AdDataOfDayODS, PlanStatOfDayDWD>.Context context, Collector<PlanStatOfDayDWD> collector) throws Exception {
-        LocalDate statDay = DateUtil.parseLocalDate(element.getStatDay());
-        long now = System.currentTimeMillis();
-        long adId = element.getAdId();
-
-        String lastQueryDay = lastQueryDayState.value();
-        // 从 maxCompute查找广告的历史数据
-        if (lastQueryDay == null || !lastQueryDay.equals(DateUtil.formatLocalDate(LocalDate.now()))) {
-            try (SqlSession session = sqlSessionFactory.openSession()) {
-                PlanStatOfDayDWDMapper mapper = session.getMapper(PlanStatOfDayDWDMapper.class);
-                Map<String, PlanStatOfDayDWD> historyData = mapper.lastReduceResult(adId, null, DateUtil.formatLocalDate(LocalDate.now().minusDays(1L)), 60)
-                        .stream()
-                        .peek(value -> {
-                            value.setAdIds(PlanUtil.parseAdStr(value.getAdIdsStr()));
-                            value.setAdGroupMap(PlanUtil.parseAdGroupMapStr(value.getAdGroupMapStr()));
-                        })
-                        .collect(Collectors.toMap(PlanStatOfDayDWD::getStatDay, data -> data, (val1, val2) -> val2));
-                historyReduceState.clear();
-                historyReduceState.putAll(historyData);
-                lastQueryDayState.update(DateUtil.formatLocalDate(LocalDate.now()));
-            }
-        }
-        PlanStatOfDayDWD lastReduceData = null;
-        for (int i = 1; i <= 60; i++) {
-            lastReduceData = historyReduceState.get(DateUtil.formatLocalDate(statDay.minusDays(i)));
-            if (lastReduceData != null) {
-                break;
-            }
-        }
-        PlanStatOfDayDWD newStatData = PlanStatOfDayDWD.reduce(lastReduceData, element, now);
-        historyReduceState.put(DateUtil.formatLocalDate(statDay), newStatData);
-        collector.collect(newStatData);
-    }
-}

+ 0 - 29
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/PlanDayDWDRollYearProcess.java

@@ -1,29 +0,0 @@
-package flink.zanxiangnet.ad.monitoring.process;
-
-import flink.zanxiangnet.ad.monitoring.pojo.entity.AdDataOfDayODS;
-import flink.zanxiangnet.ad.monitoring.pojo.entity.PlanStatOfDayDWD;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
-import org.apache.flink.util.Collector;
-
-@Slf4j
-public class PlanDayDWDRollYearProcess extends KeyedProcessFunction<Long, AdDataOfDayODS, PlanStatOfDayDWD> {
-
-    // 上次聚合的结果
-    private ValueState<PlanStatOfDayDWD> lastReduceState;
-
-    public void open(Configuration conf) {
-        lastReduceState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastReduceState", PlanStatOfDayDWD.class));
-    }
-
-    @Override
-    public void processElement(AdDataOfDayODS element, KeyedProcessFunction<Long, AdDataOfDayODS, PlanStatOfDayDWD>.Context context,
-                               Collector<PlanStatOfDayDWD> collector) throws Exception {
-        PlanStatOfDayDWD newStatDWD = PlanStatOfDayDWD.reduce(lastReduceState.value(), element, System.currentTimeMillis());
-        collector.collect(newStatDWD);
-        lastReduceState.update(newStatDWD);
-    }
-}

+ 0 - 63
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/PlanHourDTOStreamProcess.java

@@ -1,63 +0,0 @@
-package flink.zanxiangnet.ad.monitoring.process;
-
-import com.tencent.ads.model.HourlyReportsGetListStruct;
-import flink.zanxiangnet.ad.monitoring.pojo.dto.AdDataOfMinuteDTO;
-import flink.zanxiangnet.ad.monitoring.pojo.entity.AdDataOfHourODS;
-import flink.zanxiangnet.ad.monitoring.pojo.entity.AdDataOfMinuteODS;
-import com.zanxiangnet.module.util.DateUtil;
-import com.zanxiangnet.module.util.JsonUtil;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.flink.streaming.api.functions.ProcessFunction;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.OutputTag;
-import org.springframework.beans.BeanUtils;
-
-import java.util.Date;
-
-@Slf4j
-public class PlanHourDTOStreamProcess extends ProcessFunction<String, AdDataOfMinuteODS> {
-
-    // 广告分钟数据(前 5分钟的广告消耗数据)
-    private final OutputTag<AdDataOfMinuteODS> adMinuteStreamTag;
-    // 广告小时数据(往前回滚 10天)
-    private final OutputTag<AdDataOfHourODS> adHourStreamTag;
-
-    public PlanHourDTOStreamProcess(OutputTag<AdDataOfMinuteODS> adMinuteStreamTag, OutputTag<AdDataOfHourODS> adHourStreamTag) {
-        this.adMinuteStreamTag = adMinuteStreamTag;
-        this.adHourStreamTag = adHourStreamTag;
-    }
-
-    @Override
-    public void processElement(String adDataStr, ProcessFunction<String, AdDataOfMinuteODS>.Context context, Collector<AdDataOfMinuteODS> collector) throws Exception {
-        AdDataOfMinuteDTO dto = JsonUtil.toObj(adDataStr, AdDataOfMinuteDTO.class);
-        // 指记录被创建的时间
-        long createTime = dto.getCreateTime();
-        // 指记录统计的时间(如果是实时统计的数据精确到分钟,回滚的历史数据精确到天)
-        long statTime = DateUtil.localDateTimeToMilli(DateUtil.milliToLocalDateTime(dto.getDataTime()).withSecond(0).withNano(0));
-        HourlyReportsGetListStruct struct = dto.getHourlyReportsGetListStruct();
-
-        if (createTime - statTime > (60 * 60 * 1000L)) {
-            AdDataOfHourODS adODS = new AdDataOfHourODS();
-            BeanUtils.copyProperties(struct, adODS);
-            adODS.setStatDay(DateUtil.formatLocalDate(DateUtil.milliToLocalDate(statTime)));
-            adODS.setHour(dto.getHourlyReportsGetListStruct().getHour().intValue());
-            adODS.setStatTime(DateUtil.localDateTimeToMilli(DateUtil.milliToLocalDateTime(statTime).withMinute(0)));
-            adODS.setAccountId(dto.getAccountId());
-            adODS.setAgencyAccountId(dto.getHourlyReportsGetListStruct().getAccountId());
-            adODS.setCreateTime(new Date(createTime));
-            adODS.removeNull();
-            context.output(adHourStreamTag, adODS);
-        } else {
-            AdDataOfMinuteODS adODS = new AdDataOfMinuteODS();
-            BeanUtils.copyProperties(struct, adODS);
-            adODS.setStatDay(DateUtil.formatLocalDate(DateUtil.milliToLocalDate(statTime)));
-            adODS.setHour(dto.getHourlyReportsGetListStruct().getHour().intValue());
-            adODS.setStatTime(statTime);
-            adODS.setAccountId(dto.getAccountId());
-            adODS.setAgencyAccountId(dto.getHourlyReportsGetListStruct().getAccountId());
-            adODS.setCreateTime(new Date(createTime));
-            adODS.removeNull();
-            context.output(adMinuteStreamTag, adODS);
-        }
-    }
-}

+ 0 - 115
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/PlanHourDWDProcess.java

@@ -1,115 +0,0 @@
-package flink.zanxiangnet.ad.monitoring.process;
-
-import flink.zanxiangnet.ad.monitoring.config.ClickhouseDataSourceFactory;
-import flink.zanxiangnet.ad.monitoring.dao.mapper.PlanStatOfDayDWDMapper;
-import flink.zanxiangnet.ad.monitoring.pojo.entity.AdDataOfHourODS;
-import flink.zanxiangnet.ad.monitoring.pojo.entity.PlanStatOfDayDWD;
-import flink.zanxiangnet.ad.monitoring.pojo.entity.PlanStatOfHourDWD;
-import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
-import com.zanxiangnet.module.util.DateUtil;
-import flink.zanxiangnet.ad.monitoring.util.PlanUtil;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
-import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
-import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
-import org.apache.flink.util.Collector;
-import org.apache.ibatis.datasource.DataSourceFactory;
-import org.apache.ibatis.mapping.Environment;
-import org.apache.ibatis.session.SqlSession;
-import org.apache.ibatis.session.SqlSessionFactory;
-import org.apache.ibatis.session.SqlSessionFactoryBuilder;
-import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory;
-
-import java.time.LocalDate;
-import java.util.Map;
-import java.util.Properties;
-import java.util.stream.Collectors;
-
-@Slf4j
-public class PlanHourDWDProcess extends KeyedProcessFunction<Long, AdDataOfHourODS, PlanStatOfHourDWD> {
-
-    private SqlSessionFactory sqlSessionFactory;
-    // 上次查询的天数据
-    private ValueState<String> lastQueryDayState;
-    // 聚合的天的数据
-    private MapState<String, PlanStatOfDayDWD> historyReduceState;
-    // 昨天集合的最后一次数据
-    private MapState<String, PlanStatOfHourDWD> lastReduceState;
-
-    @Override
-    public void open(Configuration conf) {
-        Map<String, String> params = getRuntimeContext()
-                .getExecutionConfig()
-                .getGlobalJobParameters()
-                .toMap();
-
-        Properties ckProps = new Properties();
-        ckProps.setProperty(ApplicationProperties.CK_URL, params.get(ApplicationProperties.CK_URL));
-        ckProps.setProperty(ApplicationProperties.CK_USERNAME, params.get(ApplicationProperties.CK_USERNAME));
-        ckProps.setProperty(ApplicationProperties.CK_PASSWORD, params.get(ApplicationProperties.CK_PASSWORD));
-
-        DataSourceFactory dataSourceFactory = new ClickhouseDataSourceFactory();
-        dataSourceFactory.setProperties(ckProps);
-        Environment environment = new Environment("clickhouse", new JdbcTransactionFactory(), dataSourceFactory.getDataSource());
-        org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration(environment);
-        // 开启驼峰规则
-        configuration.setMapUnderscoreToCamelCase(true);
-        configuration.getTypeAliasRegistry().registerAlias(PlanStatOfDayDWD.class);
-        // addMapper一定要放到 alias的后面!!!!!
-        configuration.addMapper(PlanStatOfDayDWDMapper.class);
-        sqlSessionFactory = new SqlSessionFactoryBuilder().build(configuration);
-
-        lastQueryDayState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastQueryDayState", String.class));
-        historyReduceState = getRuntimeContext().getMapState(new MapStateDescriptor<>("historyReduceState", Types.STRING, Types.POJO(PlanStatOfDayDWD.class)));
-        lastReduceState = getRuntimeContext().getMapState(new MapStateDescriptor<>("lastReduceState", Types.STRING, Types.POJO(PlanStatOfHourDWD.class)));
-    }
-
-    @Override
-    public void processElement(AdDataOfHourODS element, KeyedProcessFunction<Long, AdDataOfHourODS, PlanStatOfHourDWD>.Context context,
-                               Collector<PlanStatOfHourDWD> collector) throws Exception {
-        LocalDate statDay = DateUtil.parseLocalDate(element.getStatDay());
-        long now = System.currentTimeMillis();
-        long adId = element.getAdId();
-        LocalDate today = LocalDate.now();
-
-        String lastQueryDay = lastQueryDayState.value();
-        // 从 maxCompute拉取指定 广告的历史数据
-        if (lastQueryDay == null || !lastQueryDay.equals(DateUtil.formatLocalDate(today))) {
-            try (SqlSession session = sqlSessionFactory.openSession()) {
-                PlanStatOfDayDWDMapper mapper = session.getMapper(PlanStatOfDayDWDMapper.class);
-                Map<String, PlanStatOfDayDWD> historyData = mapper.lastReduceResult(adId, null, DateUtil.formatLocalDate(today.minusDays(1L)), 60)
-                        .stream()
-                        .peek(value -> {
-                            value.setAdIds(PlanUtil.parseAdStr(value.getAdIdsStr()));
-                            value.setAdGroupMap(PlanUtil.parseAdGroupMapStr(value.getAdGroupMapStr()));
-                        })
-                        .collect(Collectors.toMap(PlanStatOfDayDWD::getStatDay, data -> data, (val1, val2) -> val2));
-                historyReduceState.clear();
-                historyReduceState.putAll(historyData);
-                lastQueryDayState.update(DateUtil.formatLocalDate(today));
-            }
-        }
-        PlanStatOfDayDWD yesterdayReduceData = null;
-        for (int i = 1; i < 60; i++) {
-            LocalDate day = statDay.minusDays(i);
-            yesterdayReduceData = historyReduceState.get(DateUtil.formatLocalDate(day));
-            if (yesterdayReduceData != null) {
-                break;
-            }
-        }
-
-        PlanStatOfHourDWD lastReduceData = lastReduceState.get(element.getStatDay());
-
-        PlanStatOfHourDWD newStatData = PlanStatOfHourDWD.reduce(yesterdayReduceData, lastReduceData, element, now);
-        lastReduceState.put(newStatData.getStatDay(), newStatData);
-        collector.collect(newStatData);
-
-        // lastReduceState.remove(DateUtil.formatLocalDate(statDay.minusDays(15L)));
-    }
-}

+ 0 - 197
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/PlanHourStreamCompletionProcess.java

@@ -1,197 +0,0 @@
-package flink.zanxiangnet.ad.monitoring.process;
-
-import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfHourDWD;
-import flink.zanxiangnet.ad.monitoring.pojo.entity.PlanStatOfHourDWD;
-import com.zanxiangnet.module.util.DateUtil;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
-import org.apache.flink.util.Collector;
-import org.springframework.beans.BeanUtils;
-
-import java.time.LocalDateTime;
-import java.time.LocalTime;
-import java.util.Date;
-
-/**
- * 小时流数据补全,把中间没消耗的时间段填充 0
- */
-public class PlanHourStreamCompletionProcess extends KeyedProcessFunction<Long, PlanStatOfHourDWD, PlanStatOfHourDWD> {
-
-    private ValueState<PlanStatOfHourDWD> lastReduceState;
-
-    @Override
-    public void open(Configuration conf) {
-        lastReduceState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastReduceState", Types.POJO(PlanStatOfHourDWD.class)));
-    }
-
-    @Override
-    public void processElement(PlanStatOfHourDWD planStatOfHourDWD, KeyedProcessFunction<Long, PlanStatOfHourDWD, PlanStatOfHourDWD>.Context context,
-                               Collector<PlanStatOfHourDWD> collector) throws Exception {
-        PlanStatOfHourDWD lastReduce = lastReduceState.value();
-        if (lastReduce == null) {
-            lastReduceState.update(planStatOfHourDWD);
-            collector.collect(planStatOfHourDWD);
-            return;
-        }
-        LocalDateTime statDateTime = LocalDateTime.of(DateUtil.parseLocalDate(planStatOfHourDWD.getStatDay()), LocalTime.of(planStatOfHourDWD.getHour(), 0, 0));
-        LocalDateTime lastStatDateTime = LocalDateTime.of(DateUtil.parseLocalDate(lastReduce.getStatDay()), LocalTime.of(lastReduce.getHour(), 0, 0));
-        if (lastStatDateTime.compareTo(statDateTime) >= 0) {
-            return;
-        }
-        long hours = DateUtil.intervalOfHour(lastStatDateTime, statDateTime);
-        if (hours > 1) {
-            // 中间有没数据的时间段,需要进行数据填充
-            for (int i = 1; i < hours; i++) {
-                // 要填充的时间
-                LocalDateTime completionTime = lastStatDateTime.plusHours(i);
-                collector.collect(completion(completionTime, lastReduce));
-            }
-        }
-        collector.collect(planStatOfHourDWD);
-        lastReduceState.update(planStatOfHourDWD);
-    }
-
-    private PlanStatOfHourDWD completion(LocalDateTime completionTime, PlanStatOfHourDWD lastReduceData) {
-        String statDay = DateUtil.formatLocalDate(completionTime.toLocalDate());
-        int hour = completionTime.getHour();
-        PlanStatOfHourDWD result = new PlanStatOfHourDWD();
-        BeanUtils.copyProperties(lastReduceData, result);
-        result.setCreateTime(new Date());
-        result.setStatDay(statDay);
-        result.setHour(hour);
-        PlanStatOfHourDWD.initValue(result);
-
-        result.setCostDeviationRateTotal(lastReduceData.getCostDeviationRateTotal());
-        result.setCostTotal(lastReduceData.getCostTotal());
-        result.setCompensationAmountTotal(lastReduceData.getCompensationAmountTotal());
-        result.setViewCountTotal(lastReduceData.getViewCountTotal());
-        result.setThousandDisplayPriceAll(lastReduceData.getThousandDisplayPriceAll());
-        result.setValidClickCountTotal(lastReduceData.getValidClickCountTotal());
-        result.setCtrAll(lastReduceData.getCtrAll());
-        result.setCpcAll(lastReduceData.getCpcAll());
-        result.setValuableClickCountTotal(lastReduceData.getValuableClickCountTotal());
-        result.setValuableClickRateAll(lastReduceData.getValuableClickRateAll());
-        result.setValuableClickCostAll(lastReduceData.getValuableClickCostAll());
-        result.setConversionsCountTotal(lastReduceData.getConversionsCountTotal());
-        result.setConversionsCostAll(lastReduceData.getConversionsCostAll());
-        result.setConversionsRateAll(lastReduceData.getConversionsRateAll());
-        result.setDeepConversionsCountTotal(lastReduceData.getDeepConversionsCountTotal());
-        result.setDeepConversionsCostAll(lastReduceData.getDeepConversionsCostAll());
-        result.setDeepConversionsRateAll(lastReduceData.getDeepConversionsRateAll());
-        result.setOrderCountTotal(lastReduceData.getOrderCountTotal());
-        result.setFirstDayOrderCountTotal(lastReduceData.getFirstDayOrderCountTotal());
-        result.setWebOrderCostAll(lastReduceData.getWebOrderCostAll());
-        result.setOrderRateAll(lastReduceData.getOrderRateAll());
-        result.setOrderAmountTotal(lastReduceData.getOrderAmountTotal());
-        result.setFirstDayOrderAmountTotal(lastReduceData.getFirstDayOrderAmountTotal());
-        result.setOrderUnitPriceAll(lastReduceData.getOrderUnitPriceAll());
-        result.setOrderRoiAll(lastReduceData.getOrderRoiAll());
-        result.setSignInCountTotal(lastReduceData.getSignInCountTotal());
-        result.setScanFollowCountTotal(lastReduceData.getScanFollowCountTotal());
-        result.setWechatAppRegisterUvTotal(lastReduceData.getWechatAppRegisterUvTotal());
-        result.setWechatMinigameRegisterCostAll(lastReduceData.getWechatMinigameRegisterCostAll());
-        result.setWechatMinigameRegisterRateAll(lastReduceData.getWechatMinigameRegisterRateAll());
-        result.setWechatMinigameArpuAll(lastReduceData.getWechatMinigameArpuAll());
-        result.setWechatMinigameRetentionCountTotal(lastReduceData.getWechatMinigameRetentionCountTotal());
-        result.setWechatMinigameCheckoutCountTotal(lastReduceData.getWechatMinigameCheckoutCountTotal());
-        result.setWechatMinigameCheckoutAmountTotal(lastReduceData.getWechatMinigameCheckoutAmountTotal());
-        result.setOfficialAccountFollowCountTotal(lastReduceData.getOfficialAccountFollowCountTotal());
-        result.setOfficialAccountFollowRateAll(lastReduceData.getOfficialAccountFollowRateAll());
-        result.setOfficialAccountRegisterUserCountTotal(lastReduceData.getOfficialAccountRegisterUserCountTotal());
-        result.setOfficialAccountRegisterRateAll(lastReduceData.getOfficialAccountRegisterRateAll());
-        result.setOfficialAccountRegisterCostAll(lastReduceData.getOfficialAccountRegisterCostAll());
-        result.setOfficialAccountRegisterAmountTotal(lastReduceData.getOfficialAccountRegisterAmountTotal());
-        result.setOfficialAccountRegisterRoiAll(lastReduceData.getOfficialAccountRegisterRoiAll());
-        result.setOfficialAccountApplyCountTotal(lastReduceData.getOfficialAccountApplyCountTotal());
-        result.setOfficialAccountApplyUserCountTotal(lastReduceData.getOfficialAccountApplyUserCountTotal());
-        result.setOfficialAccountApplyRateAll(lastReduceData.getOfficialAccountApplyRateAll());
-        result.setOfficialAccountApplyCostAll(lastReduceData.getOfficialAccountApplyCostAll());
-        result.setOfficialAccountApplyAmountTotal(lastReduceData.getOfficialAccountApplyAmountTotal());
-        result.setOfficialAccountApplyRoiAll(lastReduceData.getOfficialAccountApplyRoiAll());
-        result.setOfficialAccountOrderCountTotal(lastReduceData.getOfficialAccountOrderCountTotal());
-        result.setOfficialAccountFirstDayOrderCountTotal(lastReduceData.getOfficialAccountFirstDayOrderCountTotal());
-        result.setOfficialAccountOrderUserCountTotal(lastReduceData.getOfficialAccountOrderUserCountTotal());
-        result.setOfficialAccountOrderRateAll(lastReduceData.getOfficialAccountOrderRateAll());
-        result.setOfficialAccountOrderCostAll(lastReduceData.getOfficialAccountOrderCostAll());
-        result.setOfficialAccountOrderAmountTotal(lastReduceData.getOfficialAccountOrderAmountTotal());
-        result.setOfficialAccountFirstDayOrderAmountTotal(lastReduceData.getOfficialAccountFirstDayOrderAmountTotal());
-        result.setOfficialAccountOrderRoiAll(lastReduceData.getOfficialAccountOrderRoiAll());
-        result.setOfficialAccountConsultCountTotal(lastReduceData.getOfficialAccountConsultCountTotal());
-        result.setOfficialAccountReaderCountTotal(lastReduceData.getOfficialAccountReaderCountTotal());
-        result.setOfficialAccountCreditApplyUserCountTotal(lastReduceData.getOfficialAccountCreditApplyUserCountTotal());
-        result.setOfficialAccountCreditUserCountTotal(lastReduceData.getOfficialAccountCreditUserCountTotal());
-        result.setForwardCountTotal(lastReduceData.getForwardCountTotal());
-        result.setForwardUserCountTotal(lastReduceData.getForwardUserCountTotal());
-        result.setNoInterestCountTotal(lastReduceData.getNoInterestCountTotal());
-        result.setNoInterestCountHour(lastReduceData.getNoInterestCountHour());
-
-        if (statDay.equals(lastReduceData.getStatDay())) {
-            result.setCostDeviationRateDay(lastReduceData.getCostDeviationRateDay());
-            result.setCostDay(lastReduceData.getCostDay());
-            result.setCompensationAmountDay(lastReduceData.getCompensationAmountDay());
-            result.setViewCountDay(lastReduceData.getViewCountDay());
-            result.setThousandDisplayPriceDay(lastReduceData.getThousandDisplayPriceDay());
-            result.setValidClickCountDay(lastReduceData.getValidClickCountDay());
-            result.setCtrDay(lastReduceData.getCtrDay());
-            result.setCpcDay(lastReduceData.getCpcDay());
-            result.setValuableClickCountDay(lastReduceData.getValuableClickCountDay());
-            result.setValuableClickRateDay(lastReduceData.getValuableClickRateDay());
-            result.setValuableClickCostDay(lastReduceData.getValuableClickCostDay());
-            result.setConversionsCountDay(lastReduceData.getConversionsCountDay());
-            result.setConversionsCostDay(lastReduceData.getConversionsCostDay());
-            result.setConversionsRateDay(lastReduceData.getConversionsRateDay());
-            result.setDeepConversionsCountDay(lastReduceData.getDeepConversionsCountDay());
-            result.setDeepConversionsCostDay(lastReduceData.getDeepConversionsCostDay());
-            result.setDeepConversionsRateDay(lastReduceData.getDeepConversionsRateDay());
-            result.setOrderCountDay(lastReduceData.getOrderCountDay());
-            result.setFirstDayOrderCountDay(lastReduceData.getFirstDayOrderCountDay());
-            result.setWebOrderCostDay(lastReduceData.getWebOrderCostDay());
-            result.setOrderRateDay(lastReduceData.getOrderRateDay());
-            result.setOrderAmountDay(lastReduceData.getOrderAmountDay());
-            result.setFirstDayOrderAmountDay(lastReduceData.getFirstDayOrderAmountDay());
-            result.setOrderUnitPriceDay(lastReduceData.getOrderUnitPriceDay());
-            result.setOrderRoiDay(lastReduceData.getOrderRoiDay());
-            result.setSignInCountDay(lastReduceData.getSignInCountDay());
-            result.setScanFollowCountDay(lastReduceData.getScanFollowCountDay());
-            result.setWechatAppRegisterUvDay(lastReduceData.getWechatAppRegisterUvDay());
-            result.setWechatMinigameRegisterCostDay(lastReduceData.getWechatMinigameRegisterCostDay());
-            result.setWechatMinigameRegisterRateDay(lastReduceData.getWechatMinigameRegisterRateDay());
-            result.setWechatMinigameArpuDay(lastReduceData.getWechatMinigameArpuDay());
-            result.setWechatMinigameRetentionCountDay(lastReduceData.getWechatMinigameRetentionCountDay());
-            result.setWechatMinigameCheckoutCountDay(lastReduceData.getWechatMinigameCheckoutCountDay());
-            result.setWechatMinigameCheckoutAmountDay(lastReduceData.getWechatMinigameCheckoutAmountDay());
-            result.setOfficialAccountFollowCountDay(lastReduceData.getOfficialAccountFollowCountDay());
-            result.setOfficialAccountFollowRateDay(lastReduceData.getOfficialAccountFollowRateDay());
-            result.setOfficialAccountRegisterUserCountDay(lastReduceData.getOfficialAccountRegisterUserCountDay());
-            result.setOfficialAccountRegisterRateDay(lastReduceData.getOfficialAccountRegisterRateDay());
-            result.setOfficialAccountRegisterCostDay(lastReduceData.getOfficialAccountRegisterCostDay());
-            result.setOfficialAccountRegisterAmountDay(lastReduceData.getOfficialAccountRegisterAmountDay());
-            result.setOfficialAccountRegisterRoiDay(lastReduceData.getOfficialAccountRegisterRoiDay());
-            result.setOfficialAccountApplyCountDay(lastReduceData.getOfficialAccountApplyCountDay());
-            result.setOfficialAccountApplyUserCountDay(lastReduceData.getOfficialAccountApplyUserCountDay());
-            result.setOfficialAccountApplyRateDay(lastReduceData.getOfficialAccountApplyRateDay());
-            result.setOfficialAccountApplyCostDay(lastReduceData.getOfficialAccountApplyCostDay());
-            result.setOfficialAccountApplyAmountDay(lastReduceData.getOfficialAccountApplyAmountDay());
-            result.setOfficialAccountApplyRoiDay(lastReduceData.getOfficialAccountApplyRoiDay());
-            result.setOfficialAccountOrderCountDay(lastReduceData.getOfficialAccountOrderCountDay());
-            result.setOfficialAccountFirstDayOrderCountDay(lastReduceData.getOfficialAccountFirstDayOrderCountDay());
-            result.setOfficialAccountOrderUserCountDay(lastReduceData.getOfficialAccountOrderUserCountDay());
-            result.setOfficialAccountOrderRateDay(lastReduceData.getOfficialAccountOrderRateDay());
-            result.setOfficialAccountOrderCostDay(lastReduceData.getOfficialAccountOrderCostDay());
-            result.setOfficialAccountOrderAmountDay(lastReduceData.getOfficialAccountOrderAmountDay());
-            result.setOfficialAccountFirstDayOrderAmountDay(lastReduceData.getOfficialAccountFirstDayOrderAmountDay());
-            result.setOfficialAccountOrderRoiDay(lastReduceData.getOfficialAccountOrderRoiDay());
-            result.setOfficialAccountConsultCountDay(lastReduceData.getOfficialAccountConsultCountDay());
-            result.setOfficialAccountReaderCountDay(lastReduceData.getOfficialAccountReaderCountDay());
-            result.setOfficialAccountCreditApplyUserCountDay(lastReduceData.getOfficialAccountCreditApplyUserCountDay());
-            result.setOfficialAccountCreditUserCountDay(lastReduceData.getOfficialAccountCreditUserCountDay());
-            result.setForwardCountDay(lastReduceData.getForwardCountDay());
-            result.setForwardUserCountDay(lastReduceData.getForwardUserCountDay());
-            result.setNoInterestCountDay(lastReduceData.getNoInterestCountDay());
-        }
-        return result;
-    }
-}

+ 0 - 172
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/PlanMinuteDWDProcess.java

@@ -1,172 +0,0 @@
-package flink.zanxiangnet.ad.monitoring.process;
-
-import flink.zanxiangnet.ad.monitoring.config.ClickhouseDataSourceFactory;
-import flink.zanxiangnet.ad.monitoring.dao.mapper.PlanStatOfDayDWDMapper;
-import flink.zanxiangnet.ad.monitoring.pojo.entity.*;
-import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
-import com.zanxiangnet.module.util.DateUtil;
-import flink.zanxiangnet.ad.monitoring.util.PlanUtil;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.OutputTag;
-import org.apache.ibatis.datasource.DataSourceFactory;
-import org.apache.ibatis.mapping.Environment;
-import org.apache.ibatis.session.SqlSession;
-import org.apache.ibatis.session.SqlSessionFactory;
-import org.apache.ibatis.session.SqlSessionFactoryBuilder;
-import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory;
-
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.*;
-import java.util.stream.Collectors;
-
-@Slf4j
-public class PlanMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteODS, PlanStatOfMinuteDWD, Long, TimeWindow> {
-    private static final DateTimeFormatter formatForLastReduceKey = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH:mm");
-
-    private final OutputTag<PlanStatOfHourDWD> planHourFromMinuteStreamTag;
-
-    private SqlSessionFactory sqlSessionFactory;
-
-    // 历史的天数据
-    private ValueState<PlanStatOfDayDWD> historyDayState;
-    // 上次查询的天数据
-    private ValueState<String> lastQueryDayState;
-    // 之前聚合的昨天的数据
-    private MapState<String, PlanStatOfMinuteDWD> yesterdayMinuteState;
-    // 前 5分钟聚合的数据
-    private MapState<String, PlanStatOfMinuteDWD> lastReduceState;
-
-    public PlanMinuteDWDProcess(OutputTag<PlanStatOfHourDWD> planHourFromMinuteStreamTag) {
-        this.planHourFromMinuteStreamTag = planHourFromMinuteStreamTag;
-    }
-
-    @Override
-    public void open(Configuration conf) {
-        Map<String, String> params = getRuntimeContext()
-                .getExecutionConfig()
-                .getGlobalJobParameters()
-                .toMap();
-
-        Properties ckProps = new Properties();
-        ckProps.setProperty(ApplicationProperties.CK_URL, params.get(ApplicationProperties.CK_URL));
-        ckProps.setProperty(ApplicationProperties.CK_USERNAME, params.get(ApplicationProperties.CK_USERNAME));
-        ckProps.setProperty(ApplicationProperties.CK_PASSWORD, params.get(ApplicationProperties.CK_PASSWORD));
-
-        DataSourceFactory dataSourceFactory = new ClickhouseDataSourceFactory();
-        dataSourceFactory.setProperties(ckProps);
-        Environment environment = new Environment("clickhouse", new JdbcTransactionFactory(), dataSourceFactory.getDataSource());
-        org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration(environment);
-        // 开启驼峰规则
-        configuration.setMapUnderscoreToCamelCase(true);
-        configuration.getTypeAliasRegistry().registerAlias(PlanStatOfDayDWD.class);
-        // addMapper一定要放到 alias的后面!!!!!
-        configuration.addMapper(PlanStatOfDayDWDMapper.class);
-        sqlSessionFactory = new SqlSessionFactoryBuilder().build(configuration);
-
-        historyDayState = getRuntimeContext().getState(new ValueStateDescriptor<>("historyDayState", PlanStatOfDayDWD.class));
-        lastQueryDayState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastQueryDayState", String.class));
-        yesterdayMinuteState = getRuntimeContext().getMapState(new MapStateDescriptor<>("yesterdayMinuteState", String.class, PlanStatOfMinuteDWD.class));
-        lastReduceState = getRuntimeContext().getMapState(new MapStateDescriptor<>("lastReduceState", String.class, PlanStatOfMinuteDWD.class));
-    }
-
-    @Override
-    public void process(Long elementCount, ProcessWindowFunction<AdDataOfMinuteODS, PlanStatOfMinuteDWD, Long, TimeWindow>.Context context,
-                        Iterable<AdDataOfMinuteODS> iterable, Collector<PlanStatOfMinuteDWD> collector) throws Exception {
-        long beginTime = context.window().getStart();
-        LocalDateTime beginDateTime = DateUtil.milliToLocalDateTime(beginTime);
-        LocalDate beginDate = beginDateTime.toLocalDate();
-        String statDay = DateUtil.formatLocalDate(beginDate);
-        int hour = beginDateTime.getHour();
-        long now = System.currentTimeMillis();
-        AdDataOfMinuteODS statODS = null;
-        List<AdDataOfMinuteODS> adDataOfMinuteODSList = new ArrayList<>(24);
-        for (AdDataOfMinuteODS adDataOfMinuteODS : iterable) {
-            adDataOfMinuteODSList.add(adDataOfMinuteODS);
-            if (adDataOfMinuteODS.getHour() != hour) {
-                continue;
-            }
-            statODS = adDataOfMinuteODS;
-        }
-        if (statODS == null) {
-            return;
-        }
-
-        Long campaignId = statODS.getCampaignId();
-
-        // 昨天聚合的数据
-        PlanStatOfMinuteDWD yesterdayMinuteDWD = yesterdayMinuteState.get(DateUtil.formatLocalDate(beginDate.minusDays(1L)));
-        // 之前的数据
-        String lastQueryDay = lastQueryDayState.value();
-        if (lastQueryDay == null || !lastQueryDay.equals(statDay)) {
-            try (SqlSession session = sqlSessionFactory.openSession()) {
-                PlanStatOfDayDWDMapper mapper = session.getMapper(PlanStatOfDayDWDMapper.class);
-                List<PlanStatOfDayDWD> historyDayData = mapper.lastReduceResult(campaignId, null, DateUtil.formatLocalDate(beginDate.minusDays(2L)), 1).stream().peek(value -> {
-                    value.setAdIds(PlanUtil.parseAdStr(value.getAdIdsStr()));
-                    value.setAdGroupMap(PlanUtil.parseAdGroupMapStr(value.getAdGroupMapStr()));
-                }).collect(Collectors.toList());
-                if (!historyDayData.isEmpty()) {
-                    historyDayState.update(historyDayData.get(historyDayData.size() - 1));
-                }
-                lastQueryDayState.update(statDay);
-            }
-        }
-        PlanStatOfDayDWD beforeYesterdayDayDWD = historyDayState.value();
-
-        PlanStatOfMinuteDWD lastReduceData = lastReduceState.get(beginDateTime.minusMinutes(5L).format(formatForLastReduceKey));
-        // 聚合当天的全部数据
-        PlanStatOfMinuteDWD newAdStat = PlanStatOfMinuteDWD.reduce(beforeYesterdayDayDWD, yesterdayMinuteDWD, adDataOfMinuteODSList, statODS, lastReduceData, now);
-        collector.collect(newAdStat);
-        if (beginDateTime.getMinute() == 55) {
-            context.output(planHourFromMinuteStreamTag, PlanStatOfHourDWD.byMinuteDWD(newAdStat));
-        }
-        lastReduceState.put(beginDateTime.format(formatForLastReduceKey), newAdStat);
-        yesterdayMinuteState.put(statDay, newAdStat);
-
-        List<String> removeKeys = new ArrayList<>(10);
-        Iterator<Map.Entry<String, PlanStatOfMinuteDWD>> lastIterator = lastReduceState.iterator();
-        while (lastIterator.hasNext()) {
-            Map.Entry<String, PlanStatOfMinuteDWD> temp = lastIterator.next();
-            if (temp.getKey().equals(beginDateTime.format(formatForLastReduceKey))
-                    || temp.getKey().equals(beginDateTime.minusMinutes(5L).format(formatForLastReduceKey))
-                    || temp.getKey().equals(beginDateTime.minusMinutes(10L).format(formatForLastReduceKey))) {
-                continue;
-            }
-            removeKeys.add(temp.getKey());
-        }
-        if (!removeKeys.isEmpty()) {
-            for (String key : removeKeys) {
-                lastReduceState.remove(key);
-            }
-        }
-        removeKeys.clear();
-        Iterator<Map.Entry<String, PlanStatOfMinuteDWD>> yesterdayIterator = yesterdayMinuteState.iterator();
-        while (yesterdayIterator.hasNext()) {
-            Map.Entry<String, PlanStatOfMinuteDWD> temp = yesterdayIterator.next();
-            if (temp.getKey().equals(DateUtil.formatLocalDate(beginDate))
-                    || temp.getKey().equals(DateUtil.formatLocalDate(beginDate.minusDays(1L)))
-                    || temp.getKey().equals(DateUtil.formatLocalDate(beginDate.minusDays(2L)))) {
-                continue;
-            }
-            removeKeys.add(temp.getKey());
-        }
-        if (!removeKeys.isEmpty()) {
-            for (String key : removeKeys) {
-                yesterdayMinuteState.remove(key);
-            }
-        }
-    }
-
-    @Override
-    public void clear(ProcessWindowFunction<AdDataOfMinuteODS, PlanStatOfMinuteDWD, Long, TimeWindow>.Context context) throws Exception {
-    }
-}

+ 0 - 75
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/sink/PlanDayDWDToCkBatchSink.java

@@ -1,75 +0,0 @@
-package flink.zanxiangnet.ad.monitoring.sink;
-
-import com.aliyun.odps.tunnel.TunnelException;
-import flink.zanxiangnet.ad.monitoring.config.ClickhouseDataSourceFactory;
-import flink.zanxiangnet.ad.monitoring.dao.mapper.PlanStatOfDayDWDMapper;
-import flink.zanxiangnet.ad.monitoring.pojo.entity.PlanStatOfDayDWD;
-import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.ibatis.datasource.DataSourceFactory;
-import org.apache.ibatis.mapping.Environment;
-import org.apache.ibatis.session.SqlSession;
-import org.apache.ibatis.session.SqlSessionFactory;
-import org.apache.ibatis.session.SqlSessionFactoryBuilder;
-import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory;
-
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * 批量数据写出
- */
-@Slf4j
-public class PlanDayDWDToCkBatchSink extends RichSinkFunction<List<PlanStatOfDayDWD>> {
-
-    private SqlSessionFactory sqlSessionFactory;
-
-
-    @Override
-    public void open(Configuration config) throws Exception {
-        Map<String, String> params = getRuntimeContext()
-                .getExecutionConfig()
-                .getGlobalJobParameters()
-                .toMap();
-
-        Properties ckProps = new Properties();
-        ckProps.setProperty(ApplicationProperties.CK_URL, params.get(ApplicationProperties.CK_URL));
-        ckProps.setProperty(ApplicationProperties.CK_USERNAME, params.get(ApplicationProperties.CK_USERNAME));
-        ckProps.setProperty(ApplicationProperties.CK_PASSWORD, params.get(ApplicationProperties.CK_PASSWORD));
-
-        DataSourceFactory dataSourceFactory = new ClickhouseDataSourceFactory();
-        dataSourceFactory.setProperties(ckProps);
-        Environment environment = new Environment("clickhouse", new JdbcTransactionFactory(), dataSourceFactory.getDataSource());
-        org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration(environment);
-        // 开启驼峰规则
-        configuration.setMapUnderscoreToCamelCase(true);
-        configuration.getTypeAliasRegistry().registerAlias(PlanStatOfDayDWD.class);
-        // addMapper一定要放到 alias的后面!!!!!
-        configuration.addMapper(PlanStatOfDayDWDMapper.class);
-        sqlSessionFactory = new SqlSessionFactoryBuilder().build(configuration);
-    }
-
-    /**
-     * 将值写入到 Sink。每个值都会调用此函数
-     *
-     * @param value
-     * @param context
-     */
-    @Override
-    public void invoke(List<PlanStatOfDayDWD> value, Context context) throws TunnelException, IOException, InvocationTargetException, IllegalAccessException {
-        try (SqlSession session = sqlSessionFactory.openSession()) {
-            PlanStatOfDayDWDMapper mapper = session.getMapper(PlanStatOfDayDWDMapper.class);
-            mapper.addBatch(value);
-        }
-    }
-
-    @Override
-    public void close() throws Exception {
-        super.close();
-    }
-}

+ 1 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/stream/BatchStream.java

@@ -32,7 +32,7 @@ public class BatchStream<T> {
         return stream.assignTimestampsAndWatermarks(WatermarkStrategy.<T>forMonotonousTimestamps()
                         .withTimestampAssigner((SerializableTimestampAssigner<T>) (element, recordTimestamp) -> System.currentTimeMillis()))
                 .windowAll(TumblingEventTimeWindows.of(bufferRefreshTime))
-                .trigger(new TimerCountTrigger<>(maxBufferCount))
+                .trigger(new TimerCountTrigger<>(maxBufferCount, bufferRefreshTime))
                 // 这里不能使用 lambda表达式,flink无法推测类型
                 .apply(new AllWindowFunction<T, List<T>, TimeWindow>() {
                     @Override

+ 1 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/stream/KeyedBatchStream.java

@@ -37,7 +37,7 @@ public class KeyedBatchStream<T, KEY> {
                         .withTimestampAssigner((SerializableTimestampAssigner<T>) (element, recordTimestamp) -> System.currentTimeMillis()))
                 .keyBy(keySelector)
                 .window(TumblingEventTimeWindows.of(bufferRefreshTime))
-                .trigger(new TimerCountTrigger<>(maxBufferCount))
+                .trigger(new TimerCountTrigger<>(maxBufferCount, bufferRefreshTime))
                 // 这里不能使用 lambda表达式,flink无法推测类型
                 .apply(new WindowFunction<T, List<T>, KEY, TimeWindow>() {
                     @Override

+ 1 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/trigger/AdMinuteODSStreamTrigger.java

@@ -14,7 +14,7 @@ public class AdMinuteODSStreamTrigger extends Trigger<AdDataOfMinuteODS, TimeWin
      * @param time              此处指 eventTime
      * @param timeWindow        start:窗口开始时间,end:窗口关闭时间,maxTimestamp:能接收到数据的的最大时间
      * @param triggerContext    currentWatermark:当前水印时间,第一个元素进入的时候为 Long.MAX
-     * @return
+     * @return CONTINUE:什么都不干,FIRE:触发计算,PURGE:清理窗口元素,FIRE_AND_PURGE:触发计算然后清理窗口元素
      * @throws Exception
      */
     @Override

+ 11 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/trigger/TimerCountTrigger.java

@@ -4,6 +4,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -23,8 +24,11 @@ public class TimerCountTrigger<T, W extends TimeWindow> extends Trigger<T, W> {
     // 缓存的最大数据量
     private final Long maxBufferCount;
 
-    public TimerCountTrigger(Long maxBufferCount) {
+    private final long bufferRefreshTime;
+
+    public TimerCountTrigger(Long maxBufferCount, Time bufferRefreshTime) {
         this.maxBufferCount = maxBufferCount;
+        this.bufferRefreshTime = bufferRefreshTime.toMilliseconds();
     }
 
     /**
@@ -45,6 +49,9 @@ public class TimerCountTrigger<T, W extends TimeWindow> extends Trigger<T, W> {
             clear(window, tCtx);
             return TriggerResult.FIRE_AND_PURGE;
         }
+
+        // 注册定时清理器
+        tCtx.registerProcessingTimeTimer(window.getStart() + bufferRefreshTime);
         if (time >= window.getEnd()) {
             clear(window, tCtx);
             return TriggerResult.FIRE_AND_PURGE;
@@ -63,7 +70,8 @@ public class TimerCountTrigger<T, W extends TimeWindow> extends Trigger<T, W> {
      */
     @Override
     public TriggerResult onProcessingTime(long time, W window, TriggerContext tCtx) throws Exception {
-        return TriggerResult.CONTINUE;
+        clear(window, tCtx);
+        return TriggerResult.FIRE_AND_PURGE;
     }
 
     /**
@@ -91,6 +99,7 @@ public class TimerCountTrigger<T, W extends TimeWindow> extends Trigger<T, W> {
     @Override
     public void clear(W w, TriggerContext tCtx) throws Exception {
         tCtx.getPartitionedState(countStateDesc).clear();
+        tCtx.deleteProcessingTimeTimer(w.getStart() + bufferRefreshTime);
     }
 
     @Override

+ 43 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/util/StateTtlUtil.java

@@ -0,0 +1,43 @@
+package flink.zanxiangnet.ad.monitoring.util;
+
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.time.Time;
+
+public class StateTtlUtil {
+
+    /**
+     * 为 rocketDB状态存储构建
+     *
+     * @param ttl                      过期时间
+     * @return
+     */
+    public static StateTtlConfig rocketDBTtl(Time ttl) {
+        return rocketDBTtl(ttl, StateTtlConfig.UpdateType.OnCreateAndWrite, 300);
+    }
+
+    /**
+     * 为 rocketDB状态存储构建
+     *
+     * @param ttl                      过期时间
+     * @return
+     */
+    public static StateTtlConfig rocketDBTtl(Time ttl, StateTtlConfig.UpdateType updateType) {
+        return rocketDBTtl(ttl, updateType, 300);
+    }
+
+    /**
+     * 为 rocketDB状态存储构建
+     *
+     * @param ttl                      过期时间
+     * @param queryTimeAfterNumEntries 每次处理多少记录后查询过期时间
+     * @return
+     */
+    public static StateTtlConfig rocketDBTtl(Time ttl, StateTtlConfig.UpdateType updateType, long queryTimeAfterNumEntries) {
+        return StateTtlConfig.newBuilder(ttl)
+                .setUpdateType(updateType)
+                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
+                .setTtlTimeCharacteristic(StateTtlConfig.TtlTimeCharacteristic.ProcessingTime)
+                .cleanupInRocksdbCompactFilter(queryTimeAfterNumEntries)
+                .build();
+    }
+}