Bladeren bron

优化并行度

wcc 3 jaren geleden
bovenliggende
commit
670c9954d5

+ 2 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdDayStreamJob.java

@@ -128,7 +128,9 @@ public class AdDayStreamJob {
         // 写入 mysql
         new BatchStream<>(adDayStream, 2000L, Time.minutes(1L))
                 .toBatch()
+                .setParallelism(1)
                 .addSink(new AdDayDWDToDBBatchSink())
+                .setParallelism(1)
                 .name("sink_ad_day_for_db");
 
         env.execute(isTest ? "ad_day_stream_job_test" : "ad_day_stream_job");

+ 4 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdHourStreamJob.java

@@ -125,7 +125,9 @@ public class AdHourStreamJob {
                         .process(new CostMinuteProcess());
         new BatchStream<>(clickhouseMinuteDmStream, 3000L, Time.minutes(1L))
                 .toBatch()
+                .setParallelism(1)
                 .addSink(new AdMinuteDMToCkBatchSink())
+                .setParallelism(1)
                 .name("sink_ad_minute_dm_clickhouse");
 
         // 小时流
@@ -164,7 +166,9 @@ public class AdHourStreamJob {
                         .process(new CostHourProcess());
         new BatchStream<>(adHourDMStream, 3000L, Time.minutes(1L))
                 .toBatch()
+                .setParallelism(1)
                 .addSink(new AdHourDMToCkBatchSink())
+                .setParallelism(1)
                 .name("sink_ad_hour_dm_clickhouse");
 
         // 小时流转天流同时填充空白天数据