Browse Source

优化批量写入的性能

wcc 3 years ago
parent
commit
f4b694294b

+ 1 - 1
flink-ad-monitoring/dependency-reduced-pom.xml

@@ -97,7 +97,7 @@
               </filters>
               </filters>
               <transformers>
               <transformers>
                 <transformer>
                 <transformer>
-                  <mainClass>flink.zanxiangnet.ad.monitoring.AdDayStreamJob</mainClass>
+                  <mainClass>flink.zanxiangnet.ad.monitoring.AdHourStreamJob</mainClass>
                 </transformer>
                 </transformer>
               </transformers>
               </transformers>
             </configuration>
             </configuration>

+ 5 - 4
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdDayStreamJob.java

@@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.OutputTag;
 
 
@@ -80,7 +81,7 @@ public class AdDayStreamJob {
                 .map(AdStatOfDayODSDTO::byJson);
                 .map(AdStatOfDayODSDTO::byJson);
 
 
         // 写入原始表
         // 写入原始表
-        new KeyedBatchStream<>("adDayODSStream", adDayODSStream.map(AdStatOfDayODSDTO::getAdDataOfDayODS).keyBy(AdDataOfDayODS::getStatDay), 1000L, 3 * 60 * 1000L)
+        new KeyedBatchStream<>(adDayODSStream.map(AdStatOfDayODSDTO::getAdDataOfDayODS).keyBy(AdDataOfDayODS::getStatDay), 3000L, Time.minutes(3L))
                 .toBatch()
                 .toBatch()
                 .setParallelism(12)
                 .setParallelism(12)
                 .addSink(new TunnelBatchStreamSink<>(AdDataOfDayODS.class))
                 .addSink(new TunnelBatchStreamSink<>(AdDataOfDayODS.class))
@@ -117,14 +118,14 @@ public class AdDayStreamJob {
 
 
         DataStream<AdStatOfDayDWD> adDayStream = adDayDWDMonthStream.union(adDayDWDYearStream);
         DataStream<AdStatOfDayDWD> adDayStream = adDayDWDMonthStream.union(adDayDWDYearStream);
         // 写入 maxCompute
         // 写入 maxCompute
-        new KeyedBatchStream<>("adDayStream", adDayStream.keyBy(AdStatOfDayDWD::getStatDay), 1000L, 60 * 1000L)
+        new KeyedBatchStream<>(adDayStream.keyBy(AdStatOfDayDWD::getStatDay), 3000L, Time.minutes(3L))
                 .toBatch()
                 .toBatch()
                 .setParallelism(8)
                 .setParallelism(8)
                 .addSink(new TunnelBatchStreamSink<>(AdStatOfDayDWD.class))
                 .addSink(new TunnelBatchStreamSink<>(AdStatOfDayDWD.class))
                 .setParallelism(8)
                 .setParallelism(8)
                 .name("sink_ad_year_dwd");
                 .name("sink_ad_year_dwd");
-        // 写入 ck
-        new BatchStream<>("adDWDToCkStream", adDayStream, 1000L, 60 * 1000L)
+        // 写入 mysql
+        new BatchStream<>(adDayStream, 2000L, Time.minutes(1L))
                 .toBatch()
                 .toBatch()
                 .addSink(new AdDayDWDToDBBatchSink())
                 .addSink(new AdDayDWDToDBBatchSink())
                 .name("sink_ad_day_for_db");
                 .name("sink_ad_day_for_db");

+ 28 - 19
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdHourStreamJob.java

@@ -16,7 +16,11 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.eventtime.*;
 import org.apache.flink.api.common.eventtime.*;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.kafka.source.KafkaSource;
 import org.apache.flink.connector.kafka.source.KafkaSource;
+import org.apache.flink.contrib.streaming.state.ConfigurableRocksDBOptionsFactory;
+import org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory;
 import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
 import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.contrib.streaming.state.PredefinedOptions;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -40,6 +44,7 @@ public class AdHourStreamJob {
 
 
     public static void main(String[] args) throws Exception {
     public static void main(String[] args) throws Exception {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(6);
 
 
         // 加载配置文件到 flink的全局配置中
         // 加载配置文件到 flink的全局配置中
         Properties props = new Properties();
         Properties props = new Properties();
@@ -50,7 +55,6 @@ public class AdHourStreamJob {
             configuration.setString(key.trim(), StringUtils.isBlank(value) ? "" : value.trim());
             configuration.setString(key.trim(), StringUtils.isBlank(value) ? "" : value.trim());
         });
         });
         env.getConfig().setGlobalJobParameters(configuration);
         env.getConfig().setGlobalJobParameters(configuration);
-        int parallelismKafka = Integer.parseInt(props.getProperty(ApplicationProperties.FLINK_PARALLELISM_KAFKA));
 
 
         // checkpoint配置
         // checkpoint配置
         env.enableCheckpointing(5 * 60 * 1000L, CheckpointingMode.EXACTLY_ONCE);
         env.enableCheckpointing(5 * 60 * 1000L, CheckpointingMode.EXACTLY_ONCE);
@@ -67,13 +71,16 @@ public class AdHourStreamJob {
         env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
         env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
         // 大概是允许 checkpoint失败几次,默认 0
         // 大概是允许 checkpoint失败几次,默认 0
         env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0);
         env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0);
-        env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
+
+        EmbeddedRocksDBStateBackend stateBackend = new EmbeddedRocksDBStateBackend(true);
+        stateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);
+        env.setStateBackend(stateBackend);
         if (StringUtils.isNotBlank(props.getProperty(ApplicationProperties.FLINK_CHECKPOINT_SAVEPOINT))) {
         if (StringUtils.isNotBlank(props.getProperty(ApplicationProperties.FLINK_CHECKPOINT_SAVEPOINT))) {
             env.getCheckpointConfig().setCheckpointStorage(props.getProperty(ApplicationProperties.FLINK_CHECKPOINT_SAVEPOINT));
             env.getCheckpointConfig().setCheckpointStorage(props.getProperty(ApplicationProperties.FLINK_CHECKPOINT_SAVEPOINT));
         }
         }
 
 
         KafkaSource<String> adStreamOfMinuteSource = KafkaComponent.buildKafkaSource(props, KafkaComponent.KafkaTopic.adHourTopic, KafkaComponent.KafkaTopic.KafkaGroupId.adHourConsumerGroup);
         KafkaSource<String> adStreamOfMinuteSource = KafkaComponent.buildKafkaSource(props, KafkaComponent.KafkaTopic.adHourTopic, KafkaComponent.KafkaTopic.KafkaGroupId.adHourConsumerGroup);
-        DataStreamSource<String> adStreamOfMinuteIn = env.fromSource(adStreamOfMinuteSource, WatermarkStrategy.noWatermarks(), "adHourSource_kafka").setParallelism(parallelismKafka);
+        DataStreamSource<String> adStreamOfMinuteIn = env.fromSource(adStreamOfMinuteSource, WatermarkStrategy.noWatermarks(), "adHourSource_kafka").setParallelism(12);
 
 
         // 广告分钟数据(前 5分钟的广告消耗数据)
         // 广告分钟数据(前 5分钟的广告消耗数据)
         final OutputTag<AdDataOfMinuteODS> adMinuteStreamTag = new OutputTag<AdDataOfMinuteODS>("adMinuteStream") {
         final OutputTag<AdDataOfMinuteODS> adMinuteStreamTag = new OutputTag<AdDataOfMinuteODS>("adMinuteStream") {
@@ -84,13 +91,13 @@ public class AdHourStreamJob {
 
 
         // 对流进行映射,拆分(实时的分钟流和回滚的小时流)
         // 对流进行映射,拆分(实时的分钟流和回滚的小时流)
         SingleOutputStreamOperator<AdDataOfMinuteODS> adODSStream = adStreamOfMinuteIn
         SingleOutputStreamOperator<AdDataOfMinuteODS> adODSStream = adStreamOfMinuteIn
-                .filter(StringUtils::isNotBlank).setParallelism(parallelismKafka)
-                .process(new AdHourDTOStreamProcess(adMinuteStreamTag, adHourStreamTag)).setParallelism(parallelismKafka);
+                .filter(StringUtils::isNotBlank).setParallelism(12)
+                .process(new AdHourDTOStreamProcess(adMinuteStreamTag, adHourStreamTag)).setParallelism(12);
 
 
         // 分钟流
         // 分钟流
         DataStream<AdDataOfMinuteODS> adMinuteODSStream = adODSStream.getSideOutput(adMinuteStreamTag);
         DataStream<AdDataOfMinuteODS> adMinuteODSStream = adODSStream.getSideOutput(adMinuteStreamTag);
         // 分钟流-写入原始表
         // 分钟流-写入原始表
-        new KeyedBatchStream<>("adMinuteODSStream", adMinuteODSStream.keyBy(AdDataOfMinuteODS::getStatDay), 6000L, 2 * 60 * 1000L)
+        new KeyedBatchStream<>(adMinuteODSStream.keyBy(AdDataOfMinuteODS::getStatDay), 6000L, Time.minutes(3L))
                 .toBatch()
                 .toBatch()
                 .setParallelism(12)
                 .setParallelism(12)
                 .addSink(new TunnelBatchStreamSink<>(AdDataOfMinuteODS.class))
                 .addSink(new TunnelBatchStreamSink<>(AdDataOfMinuteODS.class))
@@ -107,12 +114,12 @@ public class AdHourStreamJob {
                 .window(TumblingEventTimeWindows.of(Time.minutes(5L)))
                 .window(TumblingEventTimeWindows.of(Time.minutes(5L)))
                 .trigger(new AdMinuteODSStreamTrigger())
                 .trigger(new AdMinuteODSStreamTrigger())
                 .process(new AdMinuteDWDProcess())
                 .process(new AdMinuteDWDProcess())
-                .setParallelism(parallelismKafka);
-        new KeyedBatchStream<>("adMinuteDWDStream", adMinuteDWDStream.keyBy(AdStatOfMinuteDWD::getStatDay), 3000L, 60 * 1000L)
+                .setParallelism(12);
+        new KeyedBatchStream<>(adMinuteDWDStream.keyBy(AdStatOfMinuteDWD::getStatDay), 5000L, Time.minutes(3L))
                 .toBatch()
                 .toBatch()
-                .setParallelism(6)
+                .setParallelism(12)
                 .addSink(new TunnelBatchStreamSink<>(AdStatOfMinuteDWD.class))
                 .addSink(new TunnelBatchStreamSink<>(AdStatOfMinuteDWD.class))
-                .setParallelism(6)
+                .setParallelism(12)
                 .name("sink_ad_minute_dwd");
                 .name("sink_ad_minute_dwd");
 
 
         //分钟流-写入 ck
         //分钟流-写入 ck
@@ -120,7 +127,7 @@ public class AdHourStreamJob {
                 adMinuteDWDStream
                 adMinuteDWDStream
                         .keyBy(AdStatOfMinuteDWD::getAdId)
                         .keyBy(AdStatOfMinuteDWD::getAdId)
                         .process(new CostMinuteProcess());
                         .process(new CostMinuteProcess());
-        new BatchStream<>("adMinuteDMStream", clickhouseMinuteDmStream, 3000L, 60 * 1000L)
+        new BatchStream<>(clickhouseMinuteDmStream, 3000L, Time.minutes(1L))
                 .toBatch()
                 .toBatch()
                 .addSink(new AdMinuteDMToCkBatchSink())
                 .addSink(new AdMinuteDMToCkBatchSink())
                 .name("sink_ad_minute_dm_clickhouse");
                 .name("sink_ad_minute_dm_clickhouse");
@@ -128,11 +135,11 @@ public class AdHourStreamJob {
         // 小时流
         // 小时流
         DataStream<AdDataOfHourODS> adHourODSStream = adODSStream.getSideOutput(adHourStreamTag);
         DataStream<AdDataOfHourODS> adHourODSStream = adODSStream.getSideOutput(adHourStreamTag);
         // 小时流-写入原始表
         // 小时流-写入原始表
-        new KeyedBatchStream<>("adHourODSStream", adHourODSStream.keyBy(AdDataOfHourODS::getStatDay), 3000L, 3 * 60 * 1000L)
+        new KeyedBatchStream<>(adHourODSStream.keyBy(AdDataOfHourODS::getStatDay), 3000L, Time.minutes(3L))
                 .toBatch()
                 .toBatch()
-                .setParallelism(6)
+                .setParallelism(12)
                 .addSink(new TunnelBatchStreamSink<>(AdDataOfHourODS.class))
                 .addSink(new TunnelBatchStreamSink<>(AdDataOfHourODS.class))
-                .setParallelism(6)
+                .setParallelism(12)
                 .name("sink_ad_hour_ods");
                 .name("sink_ad_hour_ods");
 
 
         // 小时流-计算
         // 小时流-计算
@@ -141,11 +148,11 @@ public class AdHourStreamJob {
                         .process(new AdHourDWDProcess());
                         .process(new AdHourDWDProcess());
 
 
         // 小时流-写入maxCompute
         // 小时流-写入maxCompute
-        new KeyedBatchStream<>("adHourDWDStream", adHourDWDStream.keyBy(AdStatOfHourDWD::getStatDay), 3000L, 3 * 60 * 1000L)
+        new KeyedBatchStream<>(adHourDWDStream.keyBy(AdStatOfHourDWD::getStatDay), 3000L, Time.minutes(3L))
                 .toBatch()
                 .toBatch()
-                .setParallelism(6)
+                .setParallelism(12)
                 .addSink(new TunnelBatchStreamSink<>(AdStatOfHourDWD.class))
                 .addSink(new TunnelBatchStreamSink<>(AdStatOfHourDWD.class))
-                .setParallelism(6)
+                .setParallelism(12)
                 .name("sink_ad_hour_dwd");
                 .name("sink_ad_hour_dwd");
 
 
         // 分钟流转小时流同时填充空白的小时
         // 分钟流转小时流同时填充空白的小时
@@ -159,7 +166,7 @@ public class AdHourStreamJob {
                 adHourDWDAllStream
                 adHourDWDAllStream
                         .keyBy(AdStatOfHourDWD::getAdId)
                         .keyBy(AdStatOfHourDWD::getAdId)
                         .process(new CostHourProcess());
                         .process(new CostHourProcess());
-        new BatchStream<>("adHourDMStream", adHourDMStream, 2000L, 60 * 1000L)
+        new BatchStream<>(adHourDMStream, 3000L, Time.minutes(1L))
                 .toBatch()
                 .toBatch()
                 .addSink(new AdHourDMToCkBatchSink())
                 .addSink(new AdHourDMToCkBatchSink())
                 .name("sink_ad_hour_dm_clickhouse");
                 .name("sink_ad_hour_dm_clickhouse");
@@ -169,9 +176,11 @@ public class AdHourStreamJob {
                 .keyBy(AdStatOfDayDWD::getAdId)
                 .keyBy(AdStatOfDayDWD::getAdId)
                 .process(new AdDayOnTimeStreamCompletionProcess());
                 .process(new AdDayOnTimeStreamCompletionProcess());
         // 写入 ck
         // 写入 ck
-        new BatchStream<>("adDayDWDToCkStream", dayStreamFromHour, 500L, 60 * 1000L)
+        new BatchStream<>(dayStreamFromHour, 3000L, Time.minutes(1L))
                 .toBatch()
                 .toBatch()
+                .setParallelism(1)
                 .addSink(new AdDayDWDToDBBatchSink())
                 .addSink(new AdDayDWDToDBBatchSink())
+                .setParallelism(1)
                 .name("ad_day_dwd_from_hour_sink");
                 .name("ad_day_dwd_from_hour_sink");
 
 
         env.execute("ad_hour_stream_job");
         env.execute("ad_hour_stream_job");

+ 3 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/PlanDayStreamJob.java

@@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.OutputTag;
 
 
@@ -109,12 +110,12 @@ public class PlanDayStreamJob {
 
 
         DataStream<PlanStatOfDayDWD> planDayDWDStream = planDayDWDMonthStream.union(planDayDWDYearStream);
         DataStream<PlanStatOfDayDWD> planDayDWDStream = planDayDWDMonthStream.union(planDayDWDYearStream);
         // 写入 maxCompute
         // 写入 maxCompute
-        new KeyedBatchStream<>("planDayDWDStream", planDayDWDStream.keyBy(PlanStatOfDayDWD::getStatDay), 4000L, 60 * 1000L)
+        new KeyedBatchStream<>(planDayDWDStream.keyBy(PlanStatOfDayDWD::getStatDay), 4000L, Time.minutes(1L))
                 .toBatch()
                 .toBatch()
                 .addSink(new TunnelBatchStreamSink<>(PlanStatOfDayDWD.class))
                 .addSink(new TunnelBatchStreamSink<>(PlanStatOfDayDWD.class))
                 .name("sink_plan_year_dwd");
                 .name("sink_plan_year_dwd");
         // 写入 ck
         // 写入 ck
-        new BatchStream<>("planDWDToCkStream", planDayDWDStream, 1000L, 60 * 1000L).toBatch().addSink(new PlanDayDWDToCkBatchSink());
+        new BatchStream<>(planDayDWDStream, 1000L, Time.minutes(1L)).toBatch().addSink(new PlanDayDWDToCkBatchSink());
 
 
         env.execute("plan_day_stream_job");
         env.execute("plan_day_stream_job");
     }
     }

+ 2 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/PlanHourStreamJob.java

@@ -97,7 +97,7 @@ public class PlanHourStreamJob {
                 .window(TumblingEventTimeWindows.of(Time.minutes(5L)))
                 .window(TumblingEventTimeWindows.of(Time.minutes(5L)))
                 .trigger(new PlanMinuteODSStreamTrigger())
                 .trigger(new PlanMinuteODSStreamTrigger())
                 .process(new PlanMinuteDWDProcess(planHourFromMinuteStreamTag));
                 .process(new PlanMinuteDWDProcess(planHourFromMinuteStreamTag));
-        new KeyedBatchStream<>("planMinuteDWDStream", planMinuteDWDStream.keyBy(PlanStatOfMinuteDWD::getStatDay), 4000L, 60 * 1000L)
+        new KeyedBatchStream<>(planMinuteDWDStream.keyBy(PlanStatOfMinuteDWD::getStatDay), 4000L, Time.minutes(1L))
                 .toBatch()
                 .toBatch()
                 .addSink(new TunnelBatchStreamSink<>(PlanStatOfMinuteDWD.class))
                 .addSink(new TunnelBatchStreamSink<>(PlanStatOfMinuteDWD.class))
                 .name("sink_plan_minute_dwd");
                 .name("sink_plan_minute_dwd");
@@ -109,7 +109,7 @@ public class PlanHourStreamJob {
                 .keyBy(PlanStatOfHourDWD::getCampaignId)
                 .keyBy(PlanStatOfHourDWD::getCampaignId)
                 .process(new PlanHourStreamCompletionProcess())
                 .process(new PlanHourStreamCompletionProcess())
                 .union(planHourDWDStream);
                 .union(planHourDWDStream);
-        new KeyedBatchStream<>("planHourDWDStream", planHourDWDAllStream.keyBy(PlanStatOfHourDWD::getStatDay), 4000L, 60 * 1000L)
+        new KeyedBatchStream<>(planHourDWDAllStream.keyBy(PlanStatOfHourDWD::getStatDay), 4000L,Time.minutes(1L))
                 .toBatch()
                 .toBatch()
                 .addSink(new TunnelBatchStreamSink<>(PlanStatOfHourDWD.class))
                 .addSink(new TunnelBatchStreamSink<>(PlanStatOfHourDWD.class))
                 .name("sink_plan_hour_dwd");
                 .name("sink_plan_hour_dwd");

+ 0 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/pojo/properties/ApplicationProperties.java

@@ -1,8 +1,6 @@
 package flink.zanxiangnet.ad.monitoring.pojo.properties;
 package flink.zanxiangnet.ad.monitoring.pojo.properties;
 
 
 public class ApplicationProperties {
 public class ApplicationProperties {
-    public static final String FLINK_PARALLELISM_DEFAULT = "flink.parallelism.default";
-    public static final String FLINK_PARALLELISM_KAFKA = "flink.parallelism.kafka";
     public static final String FLINK_CHECKPOINT_SAVEPOINT = "flink.checkpoint.savePath";
     public static final String FLINK_CHECKPOINT_SAVEPOINT = "flink.checkpoint.savePath";
 
 
     public static final String MAX_COMPUTE_ACCOUNT_ID = "maxCompute.accountId";
     public static final String MAX_COMPUTE_ACCOUNT_ID = "maxCompute.accountId";

+ 9 - 10
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/stream/BatchStream.java

@@ -5,8 +5,9 @@ import flink.zanxiangnet.ad.monitoring.trigger.TimerCountTrigger;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
-import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
-import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Collector;
 
 
 import java.util.List;
 import java.util.List;
@@ -14,26 +15,24 @@ import java.util.List;
 public class BatchStream<T> {
 public class BatchStream<T> {
 
 
     private final DataStream<T> stream;
     private final DataStream<T> stream;
-    private final String streamName;
     // 缓存刷新的间隔时间
     // 缓存刷新的间隔时间
-    private final Long bufferRefreshTime;
+    private final Time bufferRefreshTime;
     // 缓存的最大数据量
     // 缓存的最大数据量
     private final Long maxBufferCount;
     private final Long maxBufferCount;
 
 
-    public BatchStream(String streamName, DataStream<T> stream, Long maxBufferCount, Long bufferRefreshTime) {
-        this.streamName = streamName;
+    public BatchStream(DataStream<T> stream, Long maxBufferCount, Time bufferRefreshTime) {
         this.stream = stream;
         this.stream = stream;
         this.bufferRefreshTime = bufferRefreshTime;
         this.bufferRefreshTime = bufferRefreshTime;
         this.maxBufferCount = maxBufferCount;
         this.maxBufferCount = maxBufferCount;
     }
     }
 
 
     public SingleOutputStreamOperator<List<T>> toBatch() {
     public SingleOutputStreamOperator<List<T>> toBatch() {
-        return stream.windowAll(GlobalWindows.create())
-                .trigger(new TimerCountTrigger<>(streamName + "_trigger", maxBufferCount, bufferRefreshTime))
+        return stream.windowAll(TumblingEventTimeWindows.of(bufferRefreshTime))
+                .trigger(new TimerCountTrigger<>(maxBufferCount))
                 // 这里不能使用 lambda表达式,flink无法推测类型
                 // 这里不能使用 lambda表达式,flink无法推测类型
-                .apply(new AllWindowFunction<T, List<T>, GlobalWindow>() {
+                .apply(new AllWindowFunction<T, List<T>, TimeWindow>() {
                     @Override
                     @Override
-                    public void apply(GlobalWindow globalWindow, Iterable<T> iterable, Collector<List<T>> collector) throws Exception {
+                    public void apply(TimeWindow window, Iterable<T> iterable, Collector<List<T>> collector) throws Exception {
                         collector.collect(Lists.newArrayList(iterable));
                         collector.collect(Lists.newArrayList(iterable));
                     }
                     }
                 });
                 });

+ 9 - 10
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/stream/KeyedBatchStream.java

@@ -5,8 +5,9 @@ import flink.zanxiangnet.ad.monitoring.trigger.TimerCountTrigger;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
-import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
-import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Collector;
 
 
 import java.util.List;
 import java.util.List;
@@ -14,26 +15,24 @@ import java.util.List;
 public class KeyedBatchStream<T, KEY> {
 public class KeyedBatchStream<T, KEY> {
 
 
     private final KeyedStream<T, KEY> stream;
     private final KeyedStream<T, KEY> stream;
-    private final String streamName;
     // 缓存刷新的间隔时间
     // 缓存刷新的间隔时间
-    private final Long bufferRefreshTime;
+    private final Time bufferRefreshTime;
     // 缓存的最大数据量
     // 缓存的最大数据量
     private final Long maxBufferCount;
     private final Long maxBufferCount;
 
 
-    public KeyedBatchStream(String streamName, KeyedStream<T, KEY> stream, Long maxBufferCount, Long bufferRefreshTime) {
-        this.streamName = streamName;
+    public KeyedBatchStream(KeyedStream<T, KEY> stream, Long maxBufferCount, Time bufferRefreshTime) {
         this.stream = stream;
         this.stream = stream;
         this.bufferRefreshTime = bufferRefreshTime;
         this.bufferRefreshTime = bufferRefreshTime;
         this.maxBufferCount = maxBufferCount;
         this.maxBufferCount = maxBufferCount;
     }
     }
 
 
     public SingleOutputStreamOperator<List<T>> toBatch() {
     public SingleOutputStreamOperator<List<T>> toBatch() {
-        return stream.window(GlobalWindows.create())
-                .trigger(new TimerCountTrigger<>(streamName + "_trigger", maxBufferCount, bufferRefreshTime))
+        return stream.window(TumblingEventTimeWindows.of(bufferRefreshTime))
+                .trigger(new TimerCountTrigger<>(maxBufferCount))
                 // 这里不能使用 lambda表达式,flink无法推测类型
                 // 这里不能使用 lambda表达式,flink无法推测类型
-                .apply(new WindowFunction<T, List<T>, KEY, GlobalWindow>() {
+                .apply(new WindowFunction<T, List<T>, KEY, TimeWindow>() {
                     @Override
                     @Override
-                    public void apply(KEY key, GlobalWindow globalWindow, Iterable<T> iterable, Collector<List<T>> collector) throws Exception {
+                    public void apply(KEY key, TimeWindow window, Iterable<T> iterable, Collector<List<T>> collector) throws Exception {
                         collector.collect(Lists.newArrayList(iterable));
                         collector.collect(Lists.newArrayList(iterable));
                     }
                     }
                 });
                 });

+ 43 - 57
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/trigger/TimerCountTrigger.java

@@ -1,10 +1,12 @@
 package flink.zanxiangnet.ad.monitoring.trigger;
 package flink.zanxiangnet.ad.monitoring.trigger;
 
 
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
-import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 
 
 /**
 /**
  * 定时 && 定量触发
  * 定时 && 定量触发
@@ -12,79 +14,55 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
  * @param <T>
  * @param <T>
  * @param <W>
  * @param <W>
  */
  */
-public class TimerCountTrigger<T, W extends Window> extends Trigger<T, W> {
-    private static final String STATE_NAME_WINDOW_TIME = "_windowTime";
-    private static final String STATE_NAME_WINDOW_COUNT = "_windowCount";
+@Slf4j
+public class TimerCountTrigger<T, W extends TimeWindow> extends Trigger<T, W> {
+
+    private final ReducingStateDescriptor<Long> countStateDesc =
+            new ReducingStateDescriptor<>("count", (Long::sum), LongSerializer.INSTANCE);
 
 
-    // 缓存刷新的间隔时间
-    private final Long bufferRefreshTime;
     // 缓存的最大数据量
     // 缓存的最大数据量
     private final Long maxBufferCount;
     private final Long maxBufferCount;
-    // 缓存架构
-    private final String triggerName;
 
 
-    public TimerCountTrigger(String triggerName, Long maxBufferCount, Long bufferRefreshTime) {
-        this.triggerName = triggerName;
+    public TimerCountTrigger(Long maxBufferCount) {
         this.maxBufferCount = maxBufferCount;
         this.maxBufferCount = maxBufferCount;
-        this.bufferRefreshTime = bufferRefreshTime;
     }
     }
 
 
     /**
     /**
      * 每次元素进入时调用
      * 每次元素进入时调用
      *
      *
-     * @param element        元素
-     * @param time           元素进入的时间(这个时间有问题
-     * @param window         元素被添加到的窗口对象
-     * @param triggerContext trigger上下文
+     * @param element 元素
+     * @param time    元素进入的时间(eventTime
+     * @param window  元素被添加到的窗口对象
+     * @param tCtx    trigger上下文
      * @return
      * @return
      * @throws Exception
      * @throws Exception
      */
      */
     @Override
     @Override
-    public TriggerResult onElement(T element, long time, W window, TriggerContext triggerContext) throws Exception {
-        ValueState<Long> windowTimeState = triggerContext.getPartitionedState(new ValueStateDescriptor<>(triggerName + STATE_NAME_WINDOW_TIME, Long.class));
-        ValueState<Long> windowCountState = triggerContext.getPartitionedState(new ValueStateDescriptor<>(triggerName + STATE_NAME_WINDOW_COUNT, Long.class));
-        long now = System.currentTimeMillis();
-        Long windowCount = windowCountState.value() == null ? 1L : windowCountState.value() + 1;
-        Long windowTime = windowTimeState.value() == null ? now : windowTimeState.value();
-        if (windowCount == 1 || windowTime == now) {
-            // 注册一个定时器,到时间了去触发 ProcessTime
-            triggerContext.registerProcessingTimeTimer(triggerContext.getCurrentProcessingTime() + bufferRefreshTime);
-        }
-
-        if (windowCount >= maxBufferCount || (now - windowTime) >= bufferRefreshTime) {
-            windowCountState.update(0L);
-            windowTimeState.update(now);
+    public TriggerResult onElement(T element, long time, W window, TriggerContext tCtx) throws Exception {
+        ReducingState<Long> windowCountState = tCtx.getPartitionedState(countStateDesc);
+        windowCountState.add(1L);
+        if (windowCountState.get() >= maxBufferCount) {
+            clear(window, tCtx);
             return TriggerResult.FIRE_AND_PURGE;
             return TriggerResult.FIRE_AND_PURGE;
         }
         }
-
-        windowCountState.update(windowCount);
-        if (windowTimeState.value() == null) {
-            windowTimeState.update(now);
+        if (time >= window.getEnd()) {
+            clear(window, tCtx);
+            return TriggerResult.FIRE_AND_PURGE;
         }
         }
-
         return TriggerResult.CONTINUE;
         return TriggerResult.CONTINUE;
     }
     }
 
 
     /**
     /**
      * trigger上下文设置的 processTimer触发时调用
      * trigger上下文设置的 processTimer触发时调用
      *
      *
-     * @param time           计时器触发的时间戳
+     * @param time   计时器触发的时间戳
      * @param window
      * @param window
-     * @param triggerContext
+     * @param tCtx
      * @return
      * @return
      * @throws Exception
      * @throws Exception
      */
      */
     @Override
     @Override
-    public TriggerResult onProcessingTime(long time, W window, TriggerContext triggerContext) throws Exception {
-        ValueState<Long> windowTimeState = triggerContext.getPartitionedState(new ValueStateDescriptor<>(triggerName + STATE_NAME_WINDOW_TIME, Long.class));
-        // long now = System.currentTimeMillis();
-        long now = time;
-        Long windowTime = windowTimeState.value();
-        if (now - windowTime >= maxBufferCount) {
-            triggerContext.getPartitionedState(new ValueStateDescriptor<>(triggerName + STATE_NAME_WINDOW_TIME, Long.class)).update(0L);
-            windowTimeState.update(now);
-            return TriggerResult.FIRE_AND_PURGE;
-        }
+    public TriggerResult onProcessingTime(long time, W window, TriggerContext tCtx) throws Exception {
         return TriggerResult.CONTINUE;
         return TriggerResult.CONTINUE;
     }
     }
 
 
@@ -93,27 +71,35 @@ public class TimerCountTrigger<T, W extends Window> extends Trigger<T, W> {
      *
      *
      * @param time
      * @param time
      * @param window
      * @param window
-     * @param triggerContext
+     * @param tCtx
      * @return
      * @return
      * @throws Exception
      * @throws Exception
      */
      */
     @Override
     @Override
-    public TriggerResult onEventTime(long time, W window, TriggerContext triggerContext) throws Exception {
-        return TriggerResult.CONTINUE;
+    public TriggerResult onEventTime(long time, W window, TriggerContext tCtx) throws Exception {
+        clear(window, tCtx);
+        return TriggerResult.FIRE_AND_PURGE;
     }
     }
 
 
     /**
     /**
      * 窗口被清除时调用
      * 窗口被清除时调用
      *
      *
      * @param w
      * @param w
-     * @param triggerContext
+     * @param tCtx
      * @throws Exception
      * @throws Exception
      */
      */
     @Override
     @Override
-    public void clear(W w, TriggerContext triggerContext) throws Exception {
-        ValueState<Long> windowTime = triggerContext.getPartitionedState(new ValueStateDescriptor<>(triggerName + STATE_NAME_WINDOW_TIME, Long.class));
-        ValueState<Long> windowCount = triggerContext.getPartitionedState(new ValueStateDescriptor<>(triggerName + STATE_NAME_WINDOW_COUNT, Long.class));
-        windowTime.clear();
-        windowCount.clear();
+    public void clear(W w, TriggerContext tCtx) throws Exception {
+        tCtx.getPartitionedState(countStateDesc).clear();
+    }
+
+    @Override
+    public boolean canMerge() {
+        return true;
+    }
+
+    @Override
+    public void onMerge(W window, OnMergeContext ctx) throws Exception {
+        ctx.mergePartitionedState(countStateDesc);
     }
     }
 }
 }