wcc 3 лет назад
Родитель
Сommit
432d15f170

+ 11 - 13
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdHourStreamJob.java

@@ -106,28 +106,26 @@ public class AdHourStreamJob {
                 adHourODSStream.keyBy(AdDataOfHourODS::getAdId)
                         .process(new AdHourDWDProcess());
 
-        DataStream<AdStatOfHourDWD> adHourDWDAllStream = adMinuteDWDStream.map(AdStatOfHourDWD::byMinuteDWD)
-                .keyBy(AdStatOfHourDWD::getAdId)
-                .process(new AdHourOnTimeStreamCompletionProcess())
-                .union(adHourDWDStream
-                        .keyBy(AdStatOfHourDWD::getAdId)
-                        .process(new AdHourRollbackStreamCompletionProcess())
-                );
-        new KeyedBatchStream<>("adHourDWDStream", adHourDWDAllStream.keyBy(AdStatOfHourDWD::getStatDay), 4000L, 60 * 1000L)
+        // 小时流-写入maxCompute
+        new KeyedBatchStream<>("adHourDWDStream", adHourDWDStream.keyBy(AdStatOfHourDWD::getStatDay), 4000L, 60 * 1000L)
                 .toBatch()
                 .addSink(new TunnelBatchStreamSink<>(AdStatOfHourDWD.class))
                 .name("sink_ad_hour_dwd");
 
-        //小时数据
+        // 小时流-填充 0值
+        SingleOutputStreamOperator<AdStatOfHourDWD> hourStreamFromMinute = adMinuteDWDStream.map(AdStatOfHourDWD::byMinuteDWD)
+                .keyBy(AdStatOfHourDWD::getAdId)
+                .process(new AdHourOnTimeStreamCompletionProcess());
+
+        //小时流-写入 ck
+        DataStream<AdStatOfHourDWD> adHourDWDAllStream = hourStreamFromMinute.union(adHourDWDStream);
         SingleOutputStreamOperator<CostHourDM> clickhouseHourDmStream =
                 adHourDWDAllStream
                         .keyBy(AdStatOfHourDWD::getAdId)
-                        .process(new CostHourProcess())
-                        .name("sink_ad_hour_dm_clickhouse");
+                        .process(new CostHourProcess());
 
         BatchSinkHour batchSinkhour = new BatchSinkHour();
-        clickhouseHourDmStream.addSink(batchSinkhour);
-
+        clickhouseHourDmStream.addSink(batchSinkhour).name("sink_ad_hour_dm_clickhouse");
 
         env.execute("ad_hour_stream_job");
     }

+ 0 - 10
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/clickhouse/sink/BatchSinkHour.java

@@ -13,10 +13,6 @@ import java.util.Properties;
 @Slf4j
 public class BatchSinkHour extends RichSinkFunction<CostHourDM> {
 
-    // 对象锁,防止MaxCompute的 Tunnel对象多次初始化
-    private static final Object DUMMY_LOCK = new Object();
-
-    private String sql;
     private Connection connection = null;
 
     public BatchSinkHour() {
@@ -129,7 +125,6 @@ public class BatchSinkHour extends RichSinkFunction<CostHourDM> {
                 "?,?,?,?,?,?,?,?,?,?,\n" +
                 "?,?,?,?,?,?,?,?,?,?,\n" +
                 "?,?,?,?,?,?,?,?)";
-        log.error(costhour.toString());
         PreparedStatement preparedStatement = connection.prepareStatement(sql);
         preparedStatement.setString(1, costhour.dt);
         preparedStatement.setString(2, costhour.createTime);
@@ -220,11 +215,7 @@ public class BatchSinkHour extends RichSinkFunction<CostHourDM> {
         preparedStatement.setDouble(87, costhour.webRegisterCostHour);
         preparedStatement.setString(88, costhour.agencyAccountId);
         preparedStatement.addBatch();
-        long startTime = System.currentTimeMillis();
-        int ints[] = preparedStatement.executeBatch();
         connection.commit();
-        long endTime = System.currentTimeMillis();
-        log.info("批量插入耗时: " + (endTime - startTime) + "插入数据数量=" + ints.length);
         //clickhouse 处理重复数据
         //TODO:数据去重有问题,去除掉非最新的数据
 //        Statement statement_duplicate = connection.createStatement();
@@ -232,7 +223,6 @@ public class BatchSinkHour extends RichSinkFunction<CostHourDM> {
 //        statement_duplicate.executeQuery(sql_duplicate);
 //        connection.commit();
         long endTime_dp = System.currentTimeMillis();
-        log.info("数据清理耗时: " + (endTime_dp - endTime));
 
     }
 

+ 10 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdHourOnTimeStreamCompletionProcess.java

@@ -2,6 +2,7 @@ package flink.zanxiangnet.ad.monitoring.process;
 
 import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfHourDWD;
 import flink.zanxiangnet.ad.monitoring.util.DateUtil;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.Types;
@@ -15,6 +16,7 @@ import java.time.LocalTime;
 /**
  * 小时流数据补全,把中间没消耗的时间段填充 0
  */
+@Slf4j
 public class AdHourOnTimeStreamCompletionProcess extends KeyedProcessFunction<Long, AdStatOfHourDWD, AdStatOfHourDWD> {
 
     private ValueState<AdStatOfHourDWD> lastReduceState;
@@ -37,12 +39,19 @@ public class AdHourOnTimeStreamCompletionProcess extends KeyedProcessFunction<Lo
         if (lastStatDateTime.compareTo(statDateTime) <= 0) {
             long hours = DateUtil.intervalOfHour(lastStatDateTime, statDateTime);
             if (hours > 1) {
+                if (hours > 24) {
+                    log.error("实时:adId[{}] 要造 {}条数据, 从 {} 到 {}结束", adStatOfHourDWD.getAdId(), hours, lastStatDateTime, statDateTime);
+                }
+                long start = System.currentTimeMillis();
                 // 中间有没数据的时间段,需要进行数据填充
-                for (int i = 1; i < hours; i++) {
+                for (int i = 1; i < (hours >= 24 ? 24 : hours); i++) {
                     // 要填充的时间
                     LocalDateTime completionTime = lastStatDateTime.plusHours(i);
                     collector.collect(AdStatOfHourDWD.completion(completionTime, lastReduce));
                 }
+                if (hours > 24) {
+                    log.error("实时:给 adId[{}]造数据用时 {}", adStatOfHourDWD.getAdId(), (System.currentTimeMillis() - start));
+                }
             }
             lastReduceState.update(adStatOfHourDWD);
         }

+ 10 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdHourRollbackStreamCompletionProcess.java

@@ -2,6 +2,7 @@ package flink.zanxiangnet.ad.monitoring.process;
 
 import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfHourDWD;
 import flink.zanxiangnet.ad.monitoring.util.DateUtil;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.Types;
@@ -15,6 +16,7 @@ import java.time.LocalTime;
 /**
  * 小时流数据补全,把中间没消耗的时间段填充 0
  */
+@Slf4j
 public class AdHourRollbackStreamCompletionProcess extends KeyedProcessFunction<Long, AdStatOfHourDWD, AdStatOfHourDWD> {
 
     private ValueState<AdStatOfHourDWD> lastReduceState;
@@ -41,12 +43,19 @@ public class AdHourRollbackStreamCompletionProcess extends KeyedProcessFunction<
         }
         long hours = DateUtil.intervalOfHour(lastStatDateTime, statDateTime);
         if (hours > 1) {
+            if (hours > 24) {
+                log.error("回滚:adId[{}] 要造 {}条数据, 从 {} 到 {}结束", adStatOfHourDWD.getAdId(), hours, lastStatDateTime, statDateTime);
+            }
+            long start = System.currentTimeMillis();
             // 中间有没数据的时间段,需要进行数据填充
-            for (int i = 1; i < hours; i++) {
+            for (int i = 1; i < (hours >= 24 ? 24 : hours); i++) {
                 // 要填充的时间
                 LocalDateTime completionTime = lastStatDateTime.plusHours(i);
                 collector.collect(AdStatOfHourDWD.completion(completionTime, lastReduce));
             }
+            if (hours > 24) {
+                log.error("回滚:给 adId[{}]造数据用时 {}", adStatOfHourDWD.getAdId(), (System.currentTimeMillis() - start));
+            }
         }
         collector.collect(adStatOfHourDWD);
         lastReduceState.update(adStatOfHourDWD);