Преглед на файлове

优化小时流(从从分钟流中拆出整点数据到小时流

wcc преди 3 години
родител
ревизия
a4e0fc0e31

+ 1 - 1
flink-ad-monitoring/dependency-reduced-pom.xml

@@ -88,7 +88,7 @@
               </filters>
               <transformers>
                 <transformer>
-                  <mainClass>flink.zanxiangnet.ad.monitoring.AdStatJob</mainClass>
+                  <mainClass>flink.zanxiangnet.ad.monitoring.PlanStatJob</mainClass>
                 </transformer>
               </transformers>
             </configuration>

+ 10 - 4
flink-ad-monitoring/pom.xml

@@ -118,6 +118,12 @@ under the License.
             <version>${odps.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>com.aliyun.oss</groupId>
+            <artifactId>aliyun-sdk-oss</artifactId>
+            <version>3.10.2</version>
+        </dependency>
+
         <dependency>
             <groupId>com.tencent.ads</groupId>
             <artifactId>marketing-api-java-sdk</artifactId>
@@ -145,9 +151,9 @@ under the License.
 
         <!-- Java操作 csv文件 -->
         <dependency>
-            <groupId>net.sf.supercsv</groupId>
-            <artifactId>super-csv</artifactId>
-            <version>2.4.0</version>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-csv</artifactId>
+            <version>1.9.0</version>
         </dependency>
     </dependencies>
 
@@ -202,7 +208,7 @@ under the License.
                             <transformers>
                                 <transformer
                                         implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-                                    <mainClass>flink.zanxiangnet.ad.monitoring.AdStatJob</mainClass>
+                                    <mainClass>flink.zanxiangnet.ad.monitoring.PlanStatJob</mainClass>
                                 </transformer>
                             </transformers>
                         </configuration>

+ 9 - 5
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdStatJob.java

@@ -85,6 +85,9 @@ public class AdStatJob {
         // 广告小时数据(往前回滚 10天)
         final OutputTag<AdDataOfHourODS> adHourStreamTag = new OutputTag<AdDataOfHourODS>("adHourStream") {
         };
+        // 广告小时数据(从分钟数据流切出来的整点数据)
+        final OutputTag<AdStatOfHourDWD> adHourFromMinuteStreamTag = new OutputTag<AdStatOfHourDWD>("adHourFromMinuteStream") {
+        };
 
         // 对流进行映射,拆分(实时的分钟流和回滚的小时流)
         SingleOutputStreamOperator<AdDataOfMinuteODS> adODSStream = adStreamOfMinuteIn.filter(StringUtils::isNotBlank)
@@ -141,7 +144,7 @@ public class AdStatJob {
                 // 开一个 5分钟的滚动窗口
                 .window(TumblingEventTimeWindows.of(Time.minutes(5L)))
                 .trigger(new AdMinuteODSStreamTrigger())
-                .process(new AdMinuteDWDProcess());
+                .process(new AdMinuteDWDProcess(adHourFromMinuteStreamTag));
         // .addSink(new TunnelBatchSink<>(AdStatOfMinuteDWD.class, 30000L, 365L, 6));
         new KeyedBatchStream<>("adMinuteDWDStream", adMinuteDWDStream.keyBy(AdStatOfMinuteDWD::getStatDay), 4000L, 60 * 1000L)
                 .toBatch()
@@ -162,7 +165,7 @@ public class AdStatJob {
 
 
         //cost----小时数据处理
-        SingleOutputStreamOperator<CostHourDM> clickhouseMinuteHourDmStream =
+        /*SingleOutputStreamOperator<CostHourDM> clickhouseMinuteHourDmStream =
                 adMinuteDWDStream
                         .keyBy(AdStatOfMinuteDWD::getAdId)
                         .window(TumblingEventTimeWindows.of(Time.minutes(5L)))
@@ -171,7 +174,7 @@ public class AdStatJob {
                         .name("sink_ad_minute_hour_dm_clickhouse");
 
         BatchSinkHour batchSinkMinuteHour = new BatchSinkHour();
-        clickhouseMinuteHourDmStream.addSink(batchSinkMinuteHour);
+        clickhouseMinuteHourDmStream.addSink(batchSinkMinuteHour);*/
 
 
         // 小时流(直接写到小时报表的 ods)
@@ -249,15 +252,16 @@ public class AdStatJob {
                                 collector.collect(newStatData);
                             }
                         });
+        DataStream<AdStatOfHourDWD> adHourDWDAllStream = adMinuteDWDStream.getSideOutput(adHourFromMinuteStreamTag).union(adHourDWDStream);
         //.addSink(new TunnelBatchSink<>(AdStatOfHourDWD.class, 30000L, 365L, 6));
-        new KeyedBatchStream<>("adHourDWDStream", adHourDWDStream.keyBy(AdStatOfHourDWD::getStatDay), 4000L, 60 * 1000L)
+        new KeyedBatchStream<>("adHourDWDStream", adHourDWDAllStream.keyBy(AdStatOfHourDWD::getStatDay), 4000L, 60 * 1000L)
                 .toBatch()
                 .addSink(new TunnelBatchStreamSink<>(AdStatOfHourDWD.class))
                 .name("sink_ad_hour_dwd");
 
         //小时数据
         SingleOutputStreamOperator<CostHourDM> clickhouseHourDmStream =
-                adHourDWDStream
+                adHourDWDAllStream
                         .keyBy(AdStatOfHourDWD::getAdId)
                         .countWindow(1)
                         .process(new CostHourProcess())

+ 7 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/pojo/entity/AdStatOfHourDWD.java

@@ -2,6 +2,7 @@ package flink.zanxiangnet.ad.monitoring.pojo.entity;
 
 import com.aliyun.odps.data.Record;
 import com.google.gson.annotations.SerializedName;
+import flink.zanxiangnet.ad.monitoring.maxcompute.bean.BeanUtil;
 import flink.zanxiangnet.ad.monitoring.maxcompute.bean.annotation.MaxComputeColumn;
 import flink.zanxiangnet.ad.monitoring.maxcompute.bean.annotation.MaxComputeTable;
 import flink.zanxiangnet.ad.monitoring.util.ObjectUtil;
@@ -1200,6 +1201,12 @@ public class AdStatOfHourDWD {
     @SerializedName("no_interest_count_hour")
     private Long noInterestCountHour;
 
+    public static AdStatOfHourDWD byMinuteDWD(AdStatOfMinuteDWD minuteDWD) {
+        AdStatOfHourDWD result = new AdStatOfHourDWD();
+        BeanUtils.copyProperties(minuteDWD, result);
+        return result;
+    }
+
     public static AdStatOfHourDWD byODS(AdDataOfHourODS adODS) {
         AdStatOfHourDWD adStatOfHour = new AdStatOfHourDWD();
         adStatOfHour.setStatDay(adODS.getStatDay());

+ 12 - 4
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdMinuteDWDProcess.java

@@ -7,9 +7,7 @@ import com.aliyun.odps.account.AliyunAccount;
 import com.aliyun.odps.data.Record;
 import com.aliyun.odps.task.SQLTask;
 import flink.zanxiangnet.ad.monitoring.maxcompute.MaxComputeLog;
-import flink.zanxiangnet.ad.monitoring.pojo.entity.AdDataOfMinuteODS;
-import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfDayDWD;
-import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfMinuteDWD;
+import flink.zanxiangnet.ad.monitoring.pojo.entity.*;
 import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
 import flink.zanxiangnet.ad.monitoring.util.DateUtil;
 import flink.zanxiangnet.ad.monitoring.util.JsonUtil;
@@ -21,6 +19,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
 
 import java.time.LocalDate;
 import java.time.LocalDateTime;
@@ -33,8 +32,10 @@ import java.util.stream.Collectors;
 
 public class AdMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteODS, AdStatOfMinuteDWD, Long, TimeWindow> {
     private static final DateTimeFormatter formatForLastReduceKey = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH:mm");
-    private Odps odps;
 
+    private final OutputTag<AdStatOfHourDWD> adHourFromMinuteStreamTag;
+
+    private Odps odps;
     // 历史的天数据
     private ValueState<AdStatOfDayDWD> historyDayState;
     // 上次查询的天数据
@@ -44,6 +45,10 @@ public class AdMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteODS,
     // 前 5分钟聚合的数据
     private MapState<String, AdStatOfMinuteDWD> lastReduceState;
 
+    public AdMinuteDWDProcess(OutputTag<AdStatOfHourDWD> adHourFromMinuteStreamTag) {
+        this.adHourFromMinuteStreamTag = adHourFromMinuteStreamTag;
+    }
+
     @Override
     public void open(Configuration conf) {
         Map<String, String> params = getRuntimeContext()
@@ -117,6 +122,9 @@ public class AdMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteODS,
         // 聚合当天的全部数据
         AdStatOfMinuteDWD newAdStat = AdStatOfMinuteDWD.reduce(beforeYesterdayDayDWD, yesterdayMinuteDWD, adDataOfMinuteODSList, statODS, lastReduceData, now);
         collector.collect(newAdStat);
+        if (beginDateTime.getMinute() == 55) {
+            context.output(adHourFromMinuteStreamTag, AdStatOfHourDWD.byMinuteDWD(newAdStat));
+        }
         lastReduceState.put(beginDateTime.format(formatForLastReduceKey), newAdStat);
         yesterdayMinuteState.put(statDay, newAdStat);