Bladeren bron

日志调整

root 3 jaren geleden
bovenliggende
commit
db0fc13a7b

+ 9 - 8
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdHourStreamJob.java

@@ -79,7 +79,7 @@ public class AdHourStreamJob {
 
         KafkaSource<String> adStreamOfMinuteSource = KafkaComponent.buildKafkaSource(isTest, props, KafkaComponent.KafkaTopic.adHourTopic,
                 KafkaComponent.KafkaTopic.KafkaGroupId.adHourConsumerGroup);
-        DataStreamSource<String> adStreamOfMinuteIn = env.fromSource(adStreamOfMinuteSource, WatermarkStrategy.noWatermarks(), "adHourSource_kafka").setParallelism(8);
+        DataStreamSource<String> adStreamOfMinuteIn = env.fromSource(adStreamOfMinuteSource, WatermarkStrategy.noWatermarks(), "adHourSource_kafka").setParallelism(6);
 
         // 广告分钟数据(前 5分钟的广告消耗数据)
         final OutputTag<AdDataOfMinuteODS> adMinuteStreamTag = new OutputTag<AdDataOfMinuteODS>("adMinuteStream") {
@@ -90,8 +90,8 @@ public class AdHourStreamJob {
 
         // 对流进行映射,拆分(实时的分钟流和回滚的小时流)
         SingleOutputStreamOperator<AdDataOfMinuteODS> adODSStream = adStreamOfMinuteIn
-                .filter(StringUtils::isNotBlank).setParallelism(8)
-                .process(new AdHourDTOStreamProcess(adMinuteStreamTag, adHourStreamTag)).setParallelism(8);
+                .filter(StringUtils::isNotBlank).setParallelism(6)
+                .process(new AdHourDTOStreamProcess(adMinuteStreamTag, adHourStreamTag)).setParallelism(6);
 
         // 分钟流
         DataStream<AdDataOfMinuteODS> adMinuteODSStream = adODSStream.getSideOutput(adMinuteStreamTag);
@@ -106,12 +106,13 @@ public class AdHourStreamJob {
                 .window(TumblingEventTimeWindows.of(Time.minutes(5L)))
                 .trigger(new AdMinuteODSStreamTrigger())
                 .process(new AdMinuteDWDProcess())
-                .setParallelism(8);
+                .setParallelism(6);
+        // 分钟流输出到 OSS,最后异步写到 maxCompute
         new KeyedBatchStream<>(adMinuteDWDStream, AdStatOfMinuteDWD::getStatDay, 5000L, Time.minutes(1L))
                 .toBatch()
-                .setParallelism(8)
+                .setParallelism(6)
                 .addSink(new OssBatchStreamSink<>(AdStatOfMinuteDWD.class, new OssBatchStreamSink.MonitoringGenerateOssObjectName("ad_stat_of_minute_dwd")))
-                .setParallelism(8)
+                .setParallelism(6)
                 .name("sink_ad_minute_dwd");
 
         //分钟流-写入 ck
@@ -137,9 +138,9 @@ public class AdHourStreamJob {
         // 小时流-写入ck
         new KeyedBatchStream<>(adHourDWDStream, AdStatOfHourDWD::getStatDay, 3000L, Time.minutes(1L))
                 .toBatch()
-                .setParallelism(8)
+                .setParallelism(6)
                 .addSink(new OssBatchStreamSink<>(AdStatOfHourDWD.class, new OssBatchStreamSink.MonitoringGenerateOssObjectName("ad_stat_of_hour_dwd")))
-                .setParallelism(8)
+                .setParallelism(6)
                 .name("sink_ad_hour_dwd");
 
         // 分钟流转小时流同时填充空白的小时

+ 17 - 7
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/Test.java

@@ -3,6 +3,8 @@ package flink.zanxiangnet.ad.monitoring;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.zanxiangnet.module.util.DateUtil;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfMinuteDWD;
+import flink.zanxiangnet.ad.monitoring.sink.OssBatchStreamSink;
+import flink.zanxiangnet.ad.monitoring.stream.BatchStream;
 import flink.zanxiangnet.ad.monitoring.stream.KeyedBatchStream;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
@@ -16,12 +18,14 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.util.Collector;
 
 import java.time.Duration;
 import java.util.List;
+import java.util.Random;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -60,14 +64,19 @@ public class Test {
                         }
                     }
                 });*/
-        new KeyedBatchStream<>(pojoStream, Pojo::getUserId, 10L, Time.seconds(16))
-                .toBatch()
-                .process(new ProcessFunction<List<Pojo>, String>() {
+        SingleOutputStreamOperator<Pojo> streamOperator = pojoStream.keyBy(Pojo::getUserId).process(new ProcessFunction<Pojo, Pojo>() {
+            @Override
+            public void processElement(Pojo value, ProcessFunction<Pojo, Pojo>.Context ctx, Collector<Pojo> out) throws Exception {
+                out.collect(value);
+            }
+        });
+        new BatchStream<>(streamOperator, 100L, Time.minutes(1))
+                .toBatch().addSink(new SinkFunction<List<Pojo>>() {
                     @Override
-                    public void processElement(List<Pojo> value, ProcessFunction<List<Pojo>, String>.Context ctx, Collector<String> out) throws Exception {
-                        out.collect("收到 " + value.size() + "个元素!" + value.stream().map(Pojo::getIndex).collect(Collectors.toList()));
+                    public void invoke(List<Pojo> value, Context context) throws Exception {
+                        log.error("pojo大小:{}, userIds: {}", value.size(), value.stream().map(Pojo::getUserId).collect(Collectors.toSet()));
                     }
-                }).print();
+                });
         /*pojoStream.keyBy(Pojo::getUserId)
                 .window(TumblingEventTimeWindows.of(Time.days(1L), Time.hours(-8)))
                 .trigger(new Trigger<Pojo, TimeWindow>() {
@@ -252,12 +261,13 @@ public class Test {
 
         @Override
         public void run(SourceContext<Pojo> sourceContext) {
+            Random random = new Random();
             while (isRun) {
                 try {
                     for (int i = 0; i < 24; i++) {
                         long user1Index = index1.incrementAndGet();
                         Pojo pojo = Pojo.builder()
-                                .userId(1)
+                                .userId(random.nextInt(100))
                                 .index(user1Index)
                                 .createTime(BEGIN + ((user1Index - 1) * 60 * 60 * 1000))
                                 .build();

+ 0 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/trigger/TimerCountTrigger.java

@@ -100,7 +100,6 @@ public class TimerCountTrigger<T, W extends TimeWindow> extends Trigger<T, W> {
      */
     @Override
     public void clear(W w, TriggerContext tCtx) throws Exception {
-        tCtx.getPartitionedState(countStateDesc).clear();
         tCtx.deleteProcessingTimeTimer(w.getStart() + bufferRefreshTime);
     }