Prechádzať zdrojové kódy

TMP:adstatjob----change

cxyu 3 rokov pred
rodič
commit
b7d1d00c8d

+ 17 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdStatJob.java

@@ -8,6 +8,8 @@ import com.aliyun.odps.data.Record;
 import com.aliyun.odps.task.SQLTask;
 import com.tencent.ads.model.DailyReportsGetListStruct;
 import com.tencent.ads.model.HourlyReportsGetListStruct;
+import flink.zanxiangnet.ad.monitoring.clickhouse.sink.BatchSinkHour;
+import flink.zanxiangnet.ad.monitoring.clickhouse.sink.BatchSinkMinute;
 import flink.zanxiangnet.ad.monitoring.maxcompute.MaxComputeLog;
 import flink.zanxiangnet.ad.monitoring.pojo.dto.AdDataOfDayDTO;
 import flink.zanxiangnet.ad.monitoring.pojo.dto.AdStatOfDayODSDTO;
@@ -15,9 +17,12 @@ import flink.zanxiangnet.ad.monitoring.pojo.entity.*;
 import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
 import flink.zanxiangnet.ad.monitoring.pojo.dto.AdDataOfMinuteDTO;
 import flink.zanxiangnet.ad.monitoring.process.AdMinuteDWDProcess;
+import flink.zanxiangnet.ad.monitoring.process.CostHourProcess;
+import flink.zanxiangnet.ad.monitoring.process.CostMinuteProcess;
 import flink.zanxiangnet.ad.monitoring.sink.TunnelBatchStreamSink;
 import flink.zanxiangnet.ad.monitoring.stream.KeyedBatchStream;
 import flink.zanxiangnet.ad.monitoring.trigger.AdMinuteODSStreamTrigger;
+import flink.zanxiangnet.ad.monitoring.trigger.CostMinuteDMStreamTrigger;
 import flink.zanxiangnet.ad.monitoring.util.DateUtil;
 import flink.zanxiangnet.ad.monitoring.util.JsonUtil;
 import flink.zanxiangnet.ad.monitoring.kafka.KafkaComponent;
@@ -145,6 +150,18 @@ public class AdStatJob {
                 .addSink(new TunnelBatchStreamSink<>(AdStatOfMinuteDWD.class))
                 .name("sink_ad_minute_dwd");
 
+        //cost----分钟数据处理
+        SingleOutputStreamOperator<CostMinuterDM> clickhouseMinuteDmStream =
+                adMinuteDWDStream
+                        .keyBy(AdStatOfMinuteDWD::getAdId)
+                        .window(TumblingEventTimeWindows.of(Time.minutes(5L)))
+                        .trigger(new CostMinuteDMStreamTrigger())
+                        .process(new CostMinuteProcess())
+                        .name("sink_ad_hour_dm_clickhouse");
+
+        BatchSinkMinute batchSinkMinute = new BatchSinkMinute();
+        clickhouseMinuteDmStream.addSink(batchSinkMinute);
+
         // 小时流(直接写到小时报表的 ods)
         DataStream<AdDataOfHourODS> adHourODSStream = adODSStream.getSideOutput(adHourStreamTag);
         // 写入原始表