Ver Fonte

移除 state的清理(一个不稳定的操作,可能包含炸内存的风险)

wcc há 3 anos atrás
pai
commit
11bfe8993c

+ 6 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdDayStreamJob.java

@@ -48,7 +48,7 @@ public class AdDayStreamJob {
         env.getConfig().setGlobalJobParameters(configuration);
 
         // checkpoint配置
-        env.enableCheckpointing(3 * 60 * 1000L, CheckpointingMode.EXACTLY_ONCE);
+        env.enableCheckpointing(5 * 60 * 1000L, CheckpointingMode.EXACTLY_ONCE);
         // checkpoint执行超时时间,超时则 checkpoint失败
         env.getCheckpointConfig().setCheckpointTimeout(5 * 60 * 1000L);
         // checkpoint执行最小间隔时间
@@ -80,9 +80,11 @@ public class AdDayStreamJob {
                 .map(AdStatOfDayODSDTO::byJson);
 
         // 写入原始表
-        new KeyedBatchStream<>("adDayODSStream", adDayODSStream.map(AdStatOfDayODSDTO::getAdDataOfDayODS).keyBy(AdDataOfDayODS::getStatDay), 1000L, 60 * 1000L)
+        new KeyedBatchStream<>("adDayODSStream", adDayODSStream.map(AdStatOfDayODSDTO::getAdDataOfDayODS).keyBy(AdDataOfDayODS::getStatDay), 1000L, 3 * 60 * 1000L)
                 .toBatch()
+                .setParallelism(12)
                 .addSink(new TunnelBatchStreamSink<>(AdDataOfDayODS.class))
+                .setParallelism(12)
                 .name("sink_ad_day_ods");
 
         // 拆分流
@@ -116,7 +118,9 @@ public class AdDayStreamJob {
         // 写入 maxCompute
         new KeyedBatchStream<>("adDayStream", adDayStream.keyBy(AdStatOfDayDWD::getStatDay), 1000L, 60 * 1000L)
                 .toBatch()
+                .setParallelism(6)
                 .addSink(new TunnelBatchStreamSink<>(AdStatOfDayDWD.class))
+                .setParallelism(6)
                 .name("sink_ad_year_dwd");
         // 写入 ck
         new BatchStream<>("adDWDToCkStream", adDayStream, 1000L, 60 * 1000L).toBatch().addSink(new AdDayDWDToCkBatchSink());

+ 9 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdHourStreamJob.java

@@ -53,7 +53,7 @@ public class AdHourStreamJob {
         int parallelismKafka = Integer.parseInt(props.getProperty(ApplicationProperties.FLINK_PARALLELISM_KAFKA));
 
         // checkpoint配置
-        env.enableCheckpointing(2 * 60 * 1000L, CheckpointingMode.EXACTLY_ONCE);
+        env.enableCheckpointing(5 * 60 * 1000L, CheckpointingMode.EXACTLY_ONCE);
         // checkpoint执行超时时间,超时则 checkpoint失败
         env.getCheckpointConfig().setCheckpointTimeout(5 * 60 * 1000L);
         // checkpoint执行最小间隔时间
@@ -92,7 +92,9 @@ public class AdHourStreamJob {
         // 分钟流-写入原始表
         new KeyedBatchStream<>("adMinuteODSStream", adMinuteODSStream.keyBy(AdDataOfMinuteODS::getStatDay), 6000L, 2 * 60 * 1000L)
                 .toBatch()
+                .setParallelism(12)
                 .addSink(new TunnelBatchStreamSink<>(AdDataOfMinuteODS.class))
+                .setParallelism(12)
                 .name("sink_ad_minute_ods");
 
         // 分钟流-计算
@@ -108,7 +110,9 @@ public class AdHourStreamJob {
                 .setParallelism(parallelismKafka);
         new KeyedBatchStream<>("adMinuteDWDStream", adMinuteDWDStream.keyBy(AdStatOfMinuteDWD::getStatDay), 3000L, 60 * 1000L)
                 .toBatch()
+                .setParallelism(6)
                 .addSink(new TunnelBatchStreamSink<>(AdStatOfMinuteDWD.class))
+                .setParallelism(6)
                 .name("sink_ad_minute_dwd");
 
         //分钟流-写入 ck
@@ -126,7 +130,9 @@ public class AdHourStreamJob {
         // 小时流-写入原始表
         new KeyedBatchStream<>("adHourODSStream", adHourODSStream.keyBy(AdDataOfHourODS::getStatDay), 3000L, 3 * 60 * 1000L)
                 .toBatch()
+                .setParallelism(6)
                 .addSink(new TunnelBatchStreamSink<>(AdDataOfHourODS.class))
+                .setParallelism(6)
                 .name("sink_ad_hour_ods");
 
         // 小时流-计算
@@ -137,7 +143,9 @@ public class AdHourStreamJob {
         // 小时流-写入maxCompute
         new KeyedBatchStream<>("adHourDWDStream", adHourDWDStream.keyBy(AdStatOfHourDWD::getStatDay), 3000L, 3 * 60 * 1000L)
                 .toBatch()
+                .setParallelism(6)
                 .addSink(new TunnelBatchStreamSink<>(AdStatOfHourDWD.class))
+                .setParallelism(6)
                 .name("sink_ad_hour_dwd");
 
         // 分钟流转小时流同时填充空白的小时

+ 0 - 4
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdDayOnTimeStreamCompletionProcess.java

@@ -87,16 +87,12 @@ public class AdDayOnTimeStreamCompletionProcess extends KeyedProcessFunction<Lon
             }
             long days = DateUtil.intervalOfDays(lastStatDate, statDate);
             if (days > 1) {
-                long start = System.currentTimeMillis();
                 // 中间有没数据的时间段,需要进行数据填充
                 for (int i = 1; i < days; i++) {
                     // 要填充的时间
                     LocalDate completionDay = lastStatDate.plusDays(i);
                     collector.collect(AdStatOfDayDWD.completion(completionDay, lastReduce));
                 }
-                if (days > 12) {
-                    log.error("实时天:给 adId[{}]造数据用时 {},总计造了:{},起始时间:{},截至时间;{}", adStatOfDayDWD.getAdId(), (System.currentTimeMillis() - start), days - 1, lastStatDate, statDate);
-                }
             }
             collector.collect(adStatOfDayDWD);
         }

+ 2 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdHourDWDProcess.java

@@ -114,8 +114,8 @@ public class AdHourDWDProcess extends KeyedProcessFunction<Long, AdDataOfHourODS
         historyState.put(element.getStatDay(), dayODSHourMap);
 
         // 往前清理 15天的数据
-        historyState.remove(DateUtil.formatLocalDate(statDay.minusDays(15L)));
+        /*historyState.remove(DateUtil.formatLocalDate(statDay.minusDays(15L)));
         historyState.remove(DateUtil.formatLocalDate(statDay.minusDays(16L)));
-        historyState.remove(DateUtil.formatLocalDate(statDay.minusDays(17L)));
+        historyState.remove(DateUtil.formatLocalDate(statDay.minusDays(17L)));*/
     }
 }

+ 0 - 4
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdHourOnTimeStreamCompletionProcess.java

@@ -39,16 +39,12 @@ public class AdHourOnTimeStreamCompletionProcess extends KeyedProcessFunction<Lo
         if (lastStatDateTime.compareTo(statDateTime) <= 0) {
             long hours = DateUtil.intervalOfHour(lastStatDateTime, statDateTime);
             if (hours > 1) {
-                long start = System.currentTimeMillis();
                 // 中间有没数据的时间段,需要进行数据填充
                 for (int i = 1; i < hours; i++) {
                     // 要填充的时间
                     LocalDateTime completionTime = lastStatDateTime.plusHours(i);
                     collector.collect(AdStatOfHourDWD.completion(completionTime, lastReduce));
                 }
-                if (hours > 12) {
-                    log.error("实时小时:给 adId[{}]造数据用时 {},总计造了:{},起始时间:{},截至时间;{}", adStatOfHourDWD.getAdId(), (System.currentTimeMillis() - start), hours - 1, lastStatDateTime, statDateTime);
-                }
             }
             lastReduceState.update(adStatOfHourDWD);
         }

+ 2 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdMinuteDWDProcess.java

@@ -186,7 +186,7 @@ public class AdMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteODS,
     }
 
     private void clearState(LocalDateTime beginDateTime) throws Exception {
-        List<String> removeKeys = new ArrayList<>(20);
+        /*List<String> removeKeys = new ArrayList<>(20);
         Iterator<Map.Entry<String, AdStatOfMinuteDWD>> lastIterator = lastReduceState.iterator();
 
         String delLastReduceKey = beginDateTime.minusDays(1).format(formatForLastReduceKey);
@@ -200,6 +200,6 @@ public class AdMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteODS,
             for (String key : removeKeys) {
                 lastReduceState.remove(key);
             }
-        }
+        }*/
     }
 }

+ 2 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostHourProcess.java

@@ -101,8 +101,8 @@ public class CostHourProcess extends KeyedProcessFunction<Long, AdStatOfHourDWD,
 
 
         collector.collect(costHourDM);
-        historyReduceState.remove(DateUtil.formatLocalDate(day.minusDays(2L)));
-        historyReduceState.remove(DateUtil.formatLocalDate(day.minusDays(3L)));
+        /*historyReduceState.remove(DateUtil.formatLocalDate(day.minusDays(2L)));
+        historyReduceState.remove(DateUtil.formatLocalDate(day.minusDays(3L)));*/
 
     }
 

+ 7 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostMinuteProcess.java

@@ -93,8 +93,13 @@ public class CostMinuteProcess extends KeyedProcessFunction<Long, AdStatOfMinute
         costMinuterDM.setCostLastThreeTrend(costLastThreeTrend);
 
         collector.collect(costMinuterDM);
-        historyReduceState.remove(DateUtil.formatLocalDate(day.minusDays(2L)));
-        historyReduceState.remove(DateUtil.formatLocalDate(day.minusDays(3L)));
+        /*historyReduceState.remove(DateUtil.formatLocalDate(day.minusDays(2L)));
+        historyReduceState.remove(DateUtil.formatLocalDate(day.minusDays(3L)));*/
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
     }
 
     //数据格式转换

+ 1 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/PlanHourDWDProcess.java

@@ -110,6 +110,6 @@ public class PlanHourDWDProcess extends KeyedProcessFunction<Long, AdDataOfHourO
         lastReduceState.put(newStatData.getStatDay(), newStatData);
         collector.collect(newStatData);
 
-        lastReduceState.remove(DateUtil.formatLocalDate(statDay.minusDays(15L)));
+        // lastReduceState.remove(DateUtil.formatLocalDate(statDay.minusDays(15L)));
     }
 }

+ 9 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/sink/TunnelBatchStreamSink.java

@@ -79,10 +79,14 @@ public class TunnelBatchStreamSink<T, IN extends List<T>> extends RichSinkFuncti
      */
     @Override
     public void invoke(IN value, Context context) throws TunnelException, IOException, InvocationTargetException, IllegalAccessException {
+        long start = System.currentTimeMillis();
         T element = value.get(0);
         String partitionStr = generatePartitionStr(element);
         TableTunnel.StreamUploadSession uploadSession = tunnel.createStreamUploadSession(projectName, tableName, StringUtils.isBlank(partitionStr) ? null : new PartitionSpec(partitionStr), true);
         TableTunnel.StreamRecordPack pack = uploadSession.newRecordPack();
+        long end = System.currentTimeMillis();
+        log.error("准备写入 maxCompute[{}], 准备用时:{}", value.size(), (end - start));
+        start = end;
         for (T t : value) {
             Record record = uploadSession.newRecord();
             for (BeanUtil.FieldInfo fieldInfo : fieldInfoList) {
@@ -100,6 +104,9 @@ public class TunnelBatchStreamSink<T, IN extends List<T>> extends RichSinkFuncti
             // append只是写入内存
             pack.append(record);
         }
+        end = System.currentTimeMillis();
+        log.error("append {}条数据用时:{}", value.size(), end - start);
+        start = end;
         int retry = 0;
         do {
             try {
@@ -113,6 +120,8 @@ public class TunnelBatchStreamSink<T, IN extends List<T>> extends RichSinkFuncti
                 }
             }
         } while (retry++ < 3);
+        end = System.currentTimeMillis();
+        log.error("写入 {}条数据用时:{}", value.size(), end - start);
     }
 
     @Override