|
@@ -21,6 +21,7 @@ import flink.zanxiangnet.ad.monitoring.trigger.AdMinuteODSStreamTrigger;
|
|
|
import flink.zanxiangnet.ad.monitoring.util.DateUtil;
|
|
|
import flink.zanxiangnet.ad.monitoring.util.JsonUtil;
|
|
|
import flink.zanxiangnet.ad.monitoring.kafka.KafkaComponent;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
import org.apache.flink.api.common.eventtime.*;
|
|
|
import org.apache.flink.api.common.state.MapState;
|
|
@@ -28,6 +29,7 @@ import org.apache.flink.api.common.state.MapStateDescriptor;
|
|
|
import org.apache.flink.api.common.state.ValueState;
|
|
|
import org.apache.flink.api.common.state.ValueStateDescriptor;
|
|
|
import org.apache.flink.api.common.typeinfo.Types;
|
|
|
+import org.apache.flink.api.java.functions.KeySelector;
|
|
|
import org.apache.flink.configuration.Configuration;
|
|
|
import org.apache.flink.connector.kafka.source.KafkaSource;
|
|
|
import org.apache.flink.streaming.api.datastream.DataStream;
|
|
@@ -50,6 +52,7 @@ import java.time.LocalTime;
|
|
|
import java.util.*;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
+@Slf4j
|
|
|
public class AdStatJob {
|
|
|
|
|
|
/**
|
|
@@ -71,7 +74,7 @@ public class AdStatJob {
|
|
|
env.getConfig().setGlobalJobParameters(configuration);
|
|
|
|
|
|
KafkaSource<String> adStreamOfMinuteSource = KafkaComponent.buildKafkaSource(props, KafkaComponent.KafkaTopic.adHourTopic, KafkaComponent.KafkaTopic.KafkaGroupId.adHourConsumerGroup);
|
|
|
- DataStreamSource<String> adStreamOfMinuteIn = env.fromSource(adStreamOfMinuteSource, WatermarkStrategy.noWatermarks(), "adStreamOfMinuteSource_kafka");
|
|
|
+ DataStreamSource<String> adStreamOfMinuteIn = env.fromSource(adStreamOfMinuteSource, WatermarkStrategy.noWatermarks(), "adHourSource_kafka");
|
|
|
|
|
|
// 广告分钟数据(前 5分钟的广告消耗数据)
|
|
|
final OutputTag<AdDataOfMinuteODS> adMinuteStreamTag = new OutputTag<AdDataOfMinuteODS>("adMinuteStream") {
|
|
@@ -101,6 +104,7 @@ public class AdStatJob {
|
|
|
adODS.setAccountId(dto.getAccountId());
|
|
|
adODS.setAgencyAccountId(dto.getHourlyReportsGetListStruct().getAccountId());
|
|
|
adODS.setCreateTime(new Date(createTime));
|
|
|
+ adODS.removeNull();
|
|
|
context.output(adHourStreamTag, adODS);
|
|
|
} else {
|
|
|
AdDataOfMinuteODS adODS = new AdDataOfMinuteODS();
|
|
@@ -111,6 +115,7 @@ public class AdStatJob {
|
|
|
adODS.setAccountId(dto.getAccountId());
|
|
|
adODS.setAgencyAccountId(dto.getHourlyReportsGetListStruct().getAccountId());
|
|
|
adODS.setCreateTime(new Date(createTime));
|
|
|
+ adODS.removeNull();
|
|
|
context.output(adMinuteStreamTag, adODS);
|
|
|
}
|
|
|
}
|
|
@@ -122,7 +127,8 @@ public class AdStatJob {
|
|
|
// adMinuteODSStream.addSink(new TunnelBatchSink<>(AdDataOfMinuteODS.class, 36000L, 64000L, 3));
|
|
|
new KeyedBatchStream<>("adMinuteODSStream", adMinuteODSStream.keyBy(AdDataOfMinuteODS::getStatDay), 4000L, 2 * 60 * 1000L)
|
|
|
.toBatch()
|
|
|
- .addSink(new TunnelBatchStreamSink<>(AdDataOfMinuteODS.class));
|
|
|
+ .addSink(new TunnelBatchStreamSink<>(AdDataOfMinuteODS.class))
|
|
|
+ .name("sink_ad_minute_ods");
|
|
|
|
|
|
SingleOutputStreamOperator<AdStatOfMinuteDWD> adMinuteDWDStream = adMinuteODSStream
|
|
|
// 打水印,允许数据延迟 6分钟,同时指定时间流
|
|
@@ -136,7 +142,8 @@ public class AdStatJob {
|
|
|
// .addSink(new TunnelBatchSink<>(AdStatOfMinuteDWD.class, 30000L, 365L, 6));
|
|
|
new KeyedBatchStream<>("adMinuteDWDStream", adMinuteDWDStream.keyBy(AdStatOfMinuteDWD::getStatDay), 4000L, 60 * 1000L)
|
|
|
.toBatch()
|
|
|
- .addSink(new TunnelBatchStreamSink<>(AdStatOfMinuteDWD.class));
|
|
|
+ .addSink(new TunnelBatchStreamSink<>(AdStatOfMinuteDWD.class))
|
|
|
+ .name("sink_ad_minute_dwd");
|
|
|
|
|
|
// 小时流(直接写到小时报表的 ods)
|
|
|
DataStream<AdDataOfHourODS> adHourODSStream = adODSStream.getSideOutput(adHourStreamTag);
|
|
@@ -144,7 +151,8 @@ public class AdStatJob {
|
|
|
// adHourODSStream.addSink(new TunnelBatchSink<>(AdDataOfHourODS.class, 36000L, 64000L, 10));
|
|
|
new KeyedBatchStream<>("adHourODSStream", adHourODSStream.keyBy(AdDataOfHourODS::getStatDay), 4000L, 5 * 60 * 1000L)
|
|
|
.toBatch()
|
|
|
- .addSink(new TunnelBatchStreamSink<>(AdDataOfHourODS.class));
|
|
|
+ .addSink(new TunnelBatchStreamSink<>(AdDataOfHourODS.class))
|
|
|
+ .name("sink_ad_hour_ods");
|
|
|
|
|
|
SingleOutputStreamOperator<AdStatOfHourDWD> adHourDWDStream = adHourODSStream.keyBy(AdDataOfHourODS::getAdId)
|
|
|
.countWindow(1).process(new ProcessWindowFunction<AdDataOfHourODS, AdStatOfHourDWD, Long, GlobalWindow>() {
|
|
@@ -210,13 +218,14 @@ public class AdStatJob {
|
|
|
//.addSink(new TunnelBatchSink<>(AdStatOfHourDWD.class, 30000L, 365L, 6));
|
|
|
new KeyedBatchStream<>("adHourDWDStream", adHourDWDStream.keyBy(AdStatOfHourDWD::getStatDay), 4000L, 60 * 1000L)
|
|
|
.toBatch()
|
|
|
- .addSink(new TunnelBatchStreamSink<>(AdStatOfHourDWD.class));
|
|
|
+ .addSink(new TunnelBatchStreamSink<>(AdStatOfHourDWD.class))
|
|
|
+ .name("sink_ad_hour_dwd");
|
|
|
|
|
|
|
|
|
// ------------------------------------------------------- 处理广告的天数据 -----------------------------------------
|
|
|
- /*KafkaSource<String> adStreamOfDaySource = KafkaComponent.buildKafkaSource(props, KafkaComponent.KafkaTopic.adDayTopic, KafkaComponent.KafkaTopic.KafkaGroupId.adDayConsumerGroup);
|
|
|
+ KafkaSource<String> adStreamOfDaySource = KafkaComponent.buildKafkaSource(props, KafkaComponent.KafkaTopic.adDayTopic, KafkaComponent.KafkaTopic.KafkaGroupId.adDayConsumerGroup);
|
|
|
|
|
|
- DataStreamSource<String> adStreamOfDayIn = env.fromSource(adStreamOfDaySource, WatermarkStrategy.noWatermarks(), "adStreamOfMinuteSource_kafka");
|
|
|
+ DataStreamSource<String> adStreamOfDayIn = env.fromSource(adStreamOfDaySource, WatermarkStrategy.noWatermarks(), "adDaySource_kafka");
|
|
|
|
|
|
// 广告日数据。往前回滚 10天
|
|
|
final OutputTag<AdDataOfDayODS> adDayStreamRollDayTag = new OutputTag<AdDataOfDayODS>("adDayStreamRollDayTag") {
|
|
@@ -239,6 +248,7 @@ public class AdStatJob {
|
|
|
adODS.setAdgroupId(struct.getAdgroupId());
|
|
|
adODS.setAdId(struct.getAdId());
|
|
|
adODS.setCreateTime(createTime);
|
|
|
+ adODS.removeNull();
|
|
|
return AdStatOfDayODSDTO.builder()
|
|
|
.startDate(dto.getStartDate())
|
|
|
.endDate(dto.getEndDate())
|
|
@@ -248,7 +258,8 @@ public class AdStatJob {
|
|
|
// adDayODSStream.map(AdStatOfDayODSDTO::getAdDataOfDayODS).addSink(new TunnelBatchSink<>(AdDataOfDayODS.class, 36000L, 6000L, 10));
|
|
|
new KeyedBatchStream<>("adDayODSStream", adDayODSStream.map(AdStatOfDayODSDTO::getAdDataOfDayODS).keyBy(AdDataOfDayODS::getStatDay), 4000L, 60 * 1000L)
|
|
|
.toBatch()
|
|
|
- .addSink(new TunnelBatchStreamSink<>(AdDataOfDayODS.class));
|
|
|
+ .addSink(new TunnelBatchStreamSink<>(AdDataOfDayODS.class))
|
|
|
+ .name("sink_ad_day_ods");
|
|
|
|
|
|
// 拆分流
|
|
|
SingleOutputStreamOperator<AdStatOfDayODSDTO> adDayODSStreamSplit = adDayODSStream.process(new ProcessFunction<AdStatOfDayODSDTO, AdStatOfDayODSDTO>() {
|
|
@@ -329,7 +340,8 @@ public class AdStatJob {
|
|
|
//.addSink(new TunnelBatchSink<>(AdStatOfDayDWD.class, 30000L, 365L, 6));
|
|
|
new KeyedBatchStream<>("adDayDWDStream", adDayDWDStream.keyBy(AdStatOfDayDWD::getStatDay), 4000L, 60 * 1000L)
|
|
|
.toBatch()
|
|
|
- .addSink(new TunnelBatchStreamSink<>(AdStatOfDayDWD.class));
|
|
|
+ .addSink(new TunnelBatchStreamSink<>(AdStatOfDayDWD.class))
|
|
|
+ .name("sink_ad_day_dwd");
|
|
|
|
|
|
SingleOutputStreamOperator<AdStatOfDayDWD> adDayDWDYearStream = adDayODSStreamSplit.getSideOutput(adDayStreamRollYearTag)
|
|
|
.keyBy(AdDataOfDayODS::getAdId)
|
|
@@ -353,8 +365,9 @@ public class AdStatJob {
|
|
|
//.addSink(new TunnelBatchSink<>(AdStatOfDayDWD.class, 30000L, 365L, 6));
|
|
|
new KeyedBatchStream<>("adDayDWDYearStream", adDayDWDYearStream.keyBy(AdStatOfDayDWD::getStatDay), 4000L, 60 * 1000L)
|
|
|
.toBatch()
|
|
|
- .addSink(new TunnelBatchStreamSink<>(AdStatOfDayDWD.class));*/
|
|
|
+ .addSink(new TunnelBatchStreamSink<>(AdStatOfDayDWD.class))
|
|
|
+ .name("sink_ad_year_dwd");
|
|
|
|
|
|
- env.execute();
|
|
|
+ env.execute("job_ad");
|
|
|
}
|
|
|
}
|