Explorar o código

加入测试环境

wcc %!s(int64=3) %!d(string=hai) anos
pai
achega
726ba3de80

+ 4 - 3
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdDayStreamJob.java

@@ -36,11 +36,12 @@ import java.util.Properties;
 public class AdDayStreamJob {
 
     public static void main(String[] args) throws Exception {
+        boolean isTest = false;
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
         // 加载配置文件到 flink的全局配置中
         Properties props = new Properties();
-        props.load(AdDayStreamJob.class.getResourceAsStream("/application.properties"));
+        props.load(AdHourStreamJob.class.getResourceAsStream(isTest ? "/application.test.properties" : "/application.properties"));
         Configuration configuration = new Configuration();
         props.stringPropertyNames().forEach(key -> {
             String value = props.getProperty(key);
@@ -68,7 +69,7 @@ public class AdDayStreamJob {
             env.getCheckpointConfig().setCheckpointStorage(props.getProperty(ApplicationProperties.FLINK_CHECKPOINT_SAVEPOINT));
         }
 
-        KafkaSource<String> adStreamOfDaySource = KafkaComponent.buildKafkaSource(props, KafkaComponent.KafkaTopic.adDayTopic, KafkaComponent.KafkaTopic.KafkaGroupId.adDayConsumerGroup);
+        KafkaSource<String> adStreamOfDaySource = KafkaComponent.buildKafkaSource(isTest, props, KafkaComponent.KafkaTopic.adDayTopic, KafkaComponent.KafkaTopic.KafkaGroupId.adDayConsumerGroup);
         DataStreamSource<String> adStreamOfDayIn = env.fromSource(adStreamOfDaySource, WatermarkStrategy.noWatermarks(), "adDaySource_kafka");
 
         // 广告日数据。往前回滚 10天
@@ -130,6 +131,6 @@ public class AdDayStreamJob {
                 .addSink(new AdDayDWDToDBBatchSink())
                 .name("sink_ad_day_for_db");
 
-        env.execute("ad_day_stream_job");
+        env.execute(isTest ? "ad_day_stream_job_test" : "ad_day_stream_job");
     }
 }

+ 4 - 8
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdHourStreamJob.java

@@ -37,18 +37,14 @@ import java.util.*;
 @Slf4j
 public class AdHourStreamJob {
 
-    /**
-     * 可能有数据的最早日期
-     */
-    private static final String OLDEST_DAY = "2019-01-01";
-
     public static void main(String[] args) throws Exception {
+        boolean isTest = false;
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(6);
 
         // 加载配置文件到 flink的全局配置中
         Properties props = new Properties();
-        props.load(AdHourStreamJob.class.getResourceAsStream("/application.properties"));
+        props.load(AdHourStreamJob.class.getResourceAsStream(isTest ? "/application.test.properties" : "/application.properties"));
         Configuration configuration = new Configuration();
         props.stringPropertyNames().forEach(key -> {
             String value = props.getProperty(key);
@@ -79,7 +75,7 @@ public class AdHourStreamJob {
             env.getCheckpointConfig().setCheckpointStorage(props.getProperty(ApplicationProperties.FLINK_CHECKPOINT_SAVEPOINT));
         }
 
-        KafkaSource<String> adStreamOfMinuteSource = KafkaComponent.buildKafkaSource(props, KafkaComponent.KafkaTopic.adHourTopic, KafkaComponent.KafkaTopic.KafkaGroupId.adHourConsumerGroup);
+        KafkaSource<String> adStreamOfMinuteSource = KafkaComponent.buildKafkaSource(isTest, props, KafkaComponent.KafkaTopic.adHourTopic, KafkaComponent.KafkaTopic.KafkaGroupId.adHourConsumerGroup);
         DataStreamSource<String> adStreamOfMinuteIn = env.fromSource(adStreamOfMinuteSource, WatermarkStrategy.noWatermarks(), "adHourSource_kafka").setParallelism(12);
 
         // 广告分钟数据(前 5分钟的广告消耗数据)
@@ -183,6 +179,6 @@ public class AdHourStreamJob {
                 .setParallelism(1)
                 .name("ad_day_dwd_from_hour_sink");
 
-        env.execute("ad_hour_stream_job");
+        env.execute(isTest ? "ad_hour_stream_job_test" : "ad_hour_stream_job");
     }
 }

+ 4 - 3
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/PlanDayStreamJob.java

@@ -35,11 +35,12 @@ import java.util.Properties;
 @Slf4j
 public class PlanDayStreamJob {
     public static void main(String[] args) throws Exception {
+        boolean isTest = false;
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
         // 加载配置文件到 flink的全局配置中
         Properties props = new Properties();
-        props.load(PlanHourStreamJob.class.getResourceAsStream("/application.properties"));
+        props.load(AdHourStreamJob.class.getResourceAsStream(isTest ? "/application.test.properties" : "/application.properties"));
         Configuration configuration = new Configuration();
         props.stringPropertyNames().forEach(key -> {
             String value = props.getProperty(key);
@@ -67,7 +68,7 @@ public class PlanDayStreamJob {
             env.getCheckpointConfig().setCheckpointStorage(props.getProperty(ApplicationProperties.FLINK_CHECKPOINT_SAVEPOINT));
         }
 
-        KafkaSource<String> adStreamOfDaySource = KafkaComponent.buildKafkaSource(props, KafkaComponent.KafkaTopic.adDayTopic, KafkaComponent.KafkaTopic.KafkaGroupId.planDayConsumerGroup);
+        KafkaSource<String> adStreamOfDaySource = KafkaComponent.buildKafkaSource(isTest, props, KafkaComponent.KafkaTopic.adDayTopic, KafkaComponent.KafkaTopic.KafkaGroupId.planDayConsumerGroup);
         DataStreamSource<String> adStreamOfDayIn = env.fromSource(adStreamOfDaySource, WatermarkStrategy.noWatermarks(), "planDaySource_kafka");
 
         // 广告日数据。往前回滚 10天
@@ -117,6 +118,6 @@ public class PlanDayStreamJob {
         // 写入 ck
         new BatchStream<>(planDayDWDStream, 1000L, Time.minutes(1L)).toBatch().addSink(new PlanDayDWDToCkBatchSink());
 
-        env.execute("plan_day_stream_job");
+        env.execute(isTest ? "plan_day_stream_job_test" : "plan_day_stream_job");
     }
 }

+ 4 - 8
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/PlanHourStreamJob.java

@@ -32,17 +32,13 @@ import java.util.*;
 
 public class PlanHourStreamJob {
 
-    /**
-     * 可能有数据的最早日期
-     */
-    private static final String OLDEST_DAY = "2019-01-01";
-
     public static void main(String[] args) throws Exception {
+        boolean isTest = false;
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
         // 加载配置文件到 flink的全局配置中
         Properties props = new Properties();
-        props.load(PlanHourStreamJob.class.getResourceAsStream("/application.properties"));
+        props.load(AdHourStreamJob.class.getResourceAsStream(isTest ? "/application.test.properties" : "/application.properties"));
         Configuration configuration = new Configuration();
         props.stringPropertyNames().forEach(key -> {
             String value = props.getProperty(key);
@@ -70,7 +66,7 @@ public class PlanHourStreamJob {
             env.getCheckpointConfig().setCheckpointStorage(props.getProperty(ApplicationProperties.FLINK_CHECKPOINT_SAVEPOINT));
         }
 
-        KafkaSource<String> adStreamOfMinuteSource = KafkaComponent.buildKafkaSource(props, KafkaComponent.KafkaTopic.adHourTopic, KafkaComponent.KafkaTopic.KafkaGroupId.planHourConsumerGroup);
+        KafkaSource<String> adStreamOfMinuteSource = KafkaComponent.buildKafkaSource(isTest, props, KafkaComponent.KafkaTopic.adHourTopic, KafkaComponent.KafkaTopic.KafkaGroupId.planHourConsumerGroup);
         DataStreamSource<String> adStreamOfMinuteIn = env.fromSource(adStreamOfMinuteSource, WatermarkStrategy.noWatermarks(), "planHourSource_kafka");
 
         // 广告分钟数据(前 5分钟的广告消耗数据)
@@ -114,6 +110,6 @@ public class PlanHourStreamJob {
                 .addSink(new TunnelBatchStreamSink<>(PlanStatOfHourDWD.class))
                 .name("sink_plan_hour_dwd");
 
-        env.execute("plan_hour_stream_job");
+        env.execute(isTest ? "plan_hour_stream_job_test" : "plan_hour_stream_job");
     }
 }

+ 2 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/kafka/KafkaComponent.java

@@ -25,7 +25,7 @@ public class KafkaComponent {
         }
     }
 
-    public static KafkaSource<String> buildKafkaSource(Properties props, String topic, String groupId) {
+    public static KafkaSource<String> buildKafkaSource(boolean isTest, Properties props, String topic, String groupId) {
         String connModule = props.getProperty(KafkaProperties.KAFKA_MODULE);
         Properties kafkaProps = new Properties();
         kafkaProps.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
@@ -61,7 +61,7 @@ public class KafkaComponent {
         return KafkaSource.<String>builder()
                 .setBootstrapServers(props.getProperty(KafkaProperties.KAFKA_SERVERS))
                 .setTopics(topic)
-                .setGroupId(groupId)
+                .setGroupId(isTest ? groupId + "_test" : groupId)
                 .setProperties(kafkaProps)
                 // .setStartingOffsets(OffsetsInitializer.earliest())
                 .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))