Pārlūkot izejas kodu

调整任务失败的重启策略

root 3 gadi atpakaļ
vecāks
revīzija
7d8729c3e7

+ 6 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdDayODSStreamJob.java

@@ -9,6 +9,7 @@ import flink.zanxiangnet.ad.monitoring.stream.KeyedBatchStream;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.kafka.source.KafkaSource;
 import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
@@ -20,6 +21,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.time.Time;
 
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 /**
  * 原始数据流直接入库
@@ -30,6 +32,10 @@ public class AdDayODSStreamJob {
     public static void main(String[] args) throws Exception {
         boolean isTest = false;
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        // 任务失败后的重启策略
+        // env.setRestartStrategy(RestartStrategies.noRestart());// 失败不重启
+        // env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.of(6, TimeUnit.SECONDS)));// 5:最大重试次数、6:重启间隔时间
+        env.setRestartStrategy(RestartStrategies.failureRateRestart(3, org.apache.flink.api.common.time.Time.of(5, TimeUnit.SECONDS), org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)));// 5分钟内最多重启 3次,重启间隔时间:10s
 
         // 加载配置文件到 flink的全局配置中
         Properties props = new Properties();

+ 4 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdDayStreamJob.java

@@ -32,6 +32,7 @@ import org.apache.flink.util.OutputTag;
 
 import java.time.LocalDate;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 @Slf4j
 public class AdDayStreamJob {
@@ -40,8 +41,9 @@ public class AdDayStreamJob {
         boolean isTest = false;
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         // 任务失败后的重启策略
-        // env.setRestartStrategy(RestartStrategies.fixedDelayRestart(0, 0));
-        env.setRestartStrategy(RestartStrategies.noRestart());
+        // env.setRestartStrategy(RestartStrategies.noRestart());// 失败不重启
+        // env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.of(6, TimeUnit.SECONDS)));// 5:最大重试次数、6:重启间隔时间
+        env.setRestartStrategy(RestartStrategies.failureRateRestart(3, org.apache.flink.api.common.time.Time.of(5, TimeUnit.SECONDS), org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)));// 5分钟内最多重启 3次,重启间隔时间:10s
 
         // 加载配置文件到 flink的全局配置中
         Properties props = new Properties();

+ 6 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdHourODSStreamJob.java

@@ -12,6 +12,7 @@ import flink.zanxiangnet.ad.monitoring.stream.KeyedBatchStream;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.kafka.source.KafkaSource;
 import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
@@ -25,6 +26,7 @@ import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.util.OutputTag;
 
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 /**
  * 原始数据流直接入库
@@ -35,6 +37,10 @@ public class AdHourODSStreamJob {
     public static void main(String[] args) throws Exception {
         boolean isTest = false;
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        // 任务失败后的重启策略
+        // env.setRestartStrategy(RestartStrategies.noRestart());// 失败不重启
+        // env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.of(6, TimeUnit.SECONDS)));// 5:最大重试次数、6:重启间隔时间
+        env.setRestartStrategy(RestartStrategies.failureRateRestart(3, org.apache.flink.api.common.time.Time.of(5, TimeUnit.SECONDS), org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)));// 5分钟内最多重启 3次,重启间隔时间:10s
 
         // 加载配置文件到 flink的全局配置中
         Properties props = new Properties();

+ 4 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdHourStreamJob.java

@@ -31,6 +31,7 @@ import org.apache.flink.util.OutputTag;
 
 import java.time.Duration;
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 
 @Slf4j
 public class AdHourStreamJob {
@@ -39,8 +40,9 @@ public class AdHourStreamJob {
         boolean isTest = false;
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         // 任务失败后的重启策略
-        // env.setRestartStrategy(RestartStrategies.fixedDelayRestart(0, 0));
-        env.setRestartStrategy(RestartStrategies.noRestart());
+        // env.setRestartStrategy(RestartStrategies.noRestart());// 失败不重启
+        // env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.of(6, TimeUnit.SECONDS)));// 5:最大重试次数、6:重启间隔时间
+        env.setRestartStrategy(RestartStrategies.failureRateRestart(3, org.apache.flink.api.common.time.Time.of(5, TimeUnit.SECONDS), org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)));// 5分钟内最多重启 3次,重启间隔时间:10s
         // 设置默认的并行度
         env.setParallelism(6);