浏览代码

优化批量写入的性能

wcc 3 年之前
父节点
当前提交
7ea7a3a7f8

+ 2 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdDayStreamJob.java

@@ -81,7 +81,7 @@ public class AdDayStreamJob {
                 .map(AdStatOfDayODSDTO::byJson);
 
         // 写入原始表
-        new KeyedBatchStream<>(adDayODSStream.map(AdStatOfDayODSDTO::getAdDataOfDayODS).keyBy(AdDataOfDayODS::getStatDay), 3000L, Time.minutes(3L))
+        new KeyedBatchStream<>(adDayODSStream.map(AdStatOfDayODSDTO::getAdDataOfDayODS), AdDataOfDayODS::getStatDay, 3000L, Time.minutes(3L))
                 .toBatch()
                 .setParallelism(12)
                 .addSink(new TunnelBatchStreamSink<>(AdDataOfDayODS.class))
@@ -118,7 +118,7 @@ public class AdDayStreamJob {
 
         DataStream<AdStatOfDayDWD> adDayStream = adDayDWDMonthStream.union(adDayDWDYearStream);
         // 写入 maxCompute
-        new KeyedBatchStream<>(adDayStream.keyBy(AdStatOfDayDWD::getStatDay), 3000L, Time.minutes(3L))
+        new KeyedBatchStream<>(adDayStream, AdStatOfDayDWD::getStatDay, 3000L, Time.minutes(3L))
                 .toBatch()
                 .setParallelism(8)
                 .addSink(new TunnelBatchStreamSink<>(AdStatOfDayDWD.class))

+ 4 - 4
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdHourStreamJob.java

@@ -97,7 +97,7 @@ public class AdHourStreamJob {
         // 分钟流
         DataStream<AdDataOfMinuteODS> adMinuteODSStream = adODSStream.getSideOutput(adMinuteStreamTag);
         // 分钟流-写入原始表
-        new KeyedBatchStream<>(adMinuteODSStream.keyBy(AdDataOfMinuteODS::getStatDay), 6000L, Time.minutes(3L))
+        new KeyedBatchStream<>(adMinuteODSStream, AdDataOfMinuteODS::getStatDay, 6000L, Time.minutes(3L))
                 .toBatch()
                 .setParallelism(12)
                 .addSink(new TunnelBatchStreamSink<>(AdDataOfMinuteODS.class))
@@ -115,7 +115,7 @@ public class AdHourStreamJob {
                 .trigger(new AdMinuteODSStreamTrigger())
                 .process(new AdMinuteDWDProcess())
                 .setParallelism(12);
-        new KeyedBatchStream<>(adMinuteDWDStream.keyBy(AdStatOfMinuteDWD::getStatDay), 5000L, Time.minutes(3L))
+        new KeyedBatchStream<>(adMinuteDWDStream, AdStatOfMinuteDWD::getStatDay, 5000L, Time.minutes(3L))
                 .toBatch()
                 .setParallelism(12)
                 .addSink(new TunnelBatchStreamSink<>(AdStatOfMinuteDWD.class))
@@ -135,7 +135,7 @@ public class AdHourStreamJob {
         // 小时流
         DataStream<AdDataOfHourODS> adHourODSStream = adODSStream.getSideOutput(adHourStreamTag);
         // 小时流-写入原始表
-        new KeyedBatchStream<>(adHourODSStream.keyBy(AdDataOfHourODS::getStatDay), 3000L, Time.minutes(3L))
+        new KeyedBatchStream<>(adHourODSStream, AdDataOfHourODS::getStatDay, 3000L, Time.minutes(3L))
                 .toBatch()
                 .setParallelism(12)
                 .addSink(new TunnelBatchStreamSink<>(AdDataOfHourODS.class))
@@ -148,7 +148,7 @@ public class AdHourStreamJob {
                         .process(new AdHourDWDProcess());
 
         // 小时流-写入maxCompute
-        new KeyedBatchStream<>(adHourDWDStream.keyBy(AdStatOfHourDWD::getStatDay), 3000L, Time.minutes(3L))
+        new KeyedBatchStream<>(adHourDWDStream, AdStatOfHourDWD::getStatDay, 3000L, Time.minutes(3L))
                 .toBatch()
                 .setParallelism(12)
                 .addSink(new TunnelBatchStreamSink<>(AdStatOfHourDWD.class))

+ 1 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/PlanDayStreamJob.java

@@ -110,7 +110,7 @@ public class PlanDayStreamJob {
 
         DataStream<PlanStatOfDayDWD> planDayDWDStream = planDayDWDMonthStream.union(planDayDWDYearStream);
         // 写入 maxCompute
-        new KeyedBatchStream<>(planDayDWDStream.keyBy(PlanStatOfDayDWD::getStatDay), 4000L, Time.minutes(1L))
+        new KeyedBatchStream<>(planDayDWDStream, PlanStatOfDayDWD::getStatDay, 4000L, Time.minutes(1L))
                 .toBatch()
                 .addSink(new TunnelBatchStreamSink<>(PlanStatOfDayDWD.class))
                 .name("sink_plan_year_dwd");

+ 2 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/PlanHourStreamJob.java

@@ -97,7 +97,7 @@ public class PlanHourStreamJob {
                 .window(TumblingEventTimeWindows.of(Time.minutes(5L)))
                 .trigger(new PlanMinuteODSStreamTrigger())
                 .process(new PlanMinuteDWDProcess(planHourFromMinuteStreamTag));
-        new KeyedBatchStream<>(planMinuteDWDStream.keyBy(PlanStatOfMinuteDWD::getStatDay), 4000L, Time.minutes(1L))
+        new KeyedBatchStream<>(planMinuteDWDStream, PlanStatOfMinuteDWD::getStatDay, 4000L, Time.minutes(1L))
                 .toBatch()
                 .addSink(new TunnelBatchStreamSink<>(PlanStatOfMinuteDWD.class))
                 .name("sink_plan_minute_dwd");
@@ -109,7 +109,7 @@ public class PlanHourStreamJob {
                 .keyBy(PlanStatOfHourDWD::getCampaignId)
                 .process(new PlanHourStreamCompletionProcess())
                 .union(planHourDWDStream);
-        new KeyedBatchStream<>(planHourDWDAllStream.keyBy(PlanStatOfHourDWD::getStatDay), 4000L,Time.minutes(1L))
+        new KeyedBatchStream<>(planHourDWDAllStream, PlanStatOfHourDWD::getStatDay, 4000L, Time.minutes(1L))
                 .toBatch()
                 .addSink(new TunnelBatchStreamSink<>(PlanStatOfHourDWD.class))
                 .name("sink_plan_hour_dwd");

+ 5 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/stream/BatchStream.java

@@ -2,6 +2,8 @@ package flink.zanxiangnet.ad.monitoring.stream;
 
 import com.google.common.collect.Lists;
 import flink.zanxiangnet.ad.monitoring.trigger.TimerCountTrigger;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
@@ -27,7 +29,9 @@ public class BatchStream<T> {
     }
 
     public SingleOutputStreamOperator<List<T>> toBatch() {
-        return stream.windowAll(TumblingEventTimeWindows.of(bufferRefreshTime))
+        return stream.assignTimestampsAndWatermarks(WatermarkStrategy.<T>forMonotonousTimestamps()
+                        .withTimestampAssigner((SerializableTimestampAssigner<T>) (element, recordTimestamp) -> System.currentTimeMillis()))
+                .windowAll(TumblingEventTimeWindows.of(bufferRefreshTime))
                 .trigger(new TimerCountTrigger<>(maxBufferCount))
                 // 这里不能使用 lambda表达式,flink无法推测类型
                 .apply(new AllWindowFunction<T, List<T>, TimeWindow>() {

+ 12 - 3
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/stream/KeyedBatchStream.java

@@ -2,6 +2,10 @@ package flink.zanxiangnet.ad.monitoring.stream;
 
 import com.google.common.collect.Lists;
 import flink.zanxiangnet.ad.monitoring.trigger.TimerCountTrigger;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
@@ -14,20 +18,25 @@ import java.util.List;
 
 public class KeyedBatchStream<T, KEY> {
 
-    private final KeyedStream<T, KEY> stream;
+    private final DataStream<T> stream;
+    private final KeySelector<T, KEY> keySelector;
     // 缓存刷新的间隔时间
     private final Time bufferRefreshTime;
     // 缓存的最大数据量
     private final Long maxBufferCount;
 
-    public KeyedBatchStream(KeyedStream<T, KEY> stream, Long maxBufferCount, Time bufferRefreshTime) {
+    public KeyedBatchStream(DataStream<T> stream, KeySelector<T, KEY> keySelector, Long maxBufferCount, Time bufferRefreshTime) {
         this.stream = stream;
+        this.keySelector = keySelector;
         this.bufferRefreshTime = bufferRefreshTime;
         this.maxBufferCount = maxBufferCount;
     }
 
     public SingleOutputStreamOperator<List<T>> toBatch() {
-        return stream.window(TumblingEventTimeWindows.of(bufferRefreshTime))
+        return stream.assignTimestampsAndWatermarks(WatermarkStrategy.<T>forMonotonousTimestamps()
+                        .withTimestampAssigner((SerializableTimestampAssigner<T>) (element, recordTimestamp) -> System.currentTimeMillis()))
+                .keyBy(keySelector)
+                .window(TumblingEventTimeWindows.of(bufferRefreshTime))
                 .trigger(new TimerCountTrigger<>(maxBufferCount))
                 // 这里不能使用 lambda表达式,flink无法推测类型
                 .apply(new WindowFunction<T, List<T>, KEY, TimeWindow>() {