瀏覽代碼

增加 checkpoint配置

wcc 3 年之前
父節點
當前提交
5156dc0c06

+ 2 - 0
.gitignore

@@ -1,3 +1,5 @@
+*.properties
+
 target/
 !.mvn/wrapper/maven-wrapper.jar
 !**/src/main/**/target/

+ 12 - 0
flink-ad-monitoring/dependency-reduced-pom.xml

@@ -182,6 +182,18 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
+      <version>1.14.0</version>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <artifactId>frocksdbjni</artifactId>
+          <groupId>com.ververica</groupId>
+        </exclusion>
+      </exclusions>
+    </dependency>
     <dependency>
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-slf4j-impl</artifactId>

+ 7 - 0
flink-ad-monitoring/pom.xml

@@ -70,6 +70,13 @@ under the License.
             <scope>provided</scope>
         </dependency>
 
+        <!-- 开启 flink的磁盘 checkpoint -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
 
         <!-- Add connector dependencies here. They must be in the default scope (compile). -->
 

+ 26 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdDayStreamJob.java

@@ -4,6 +4,7 @@ import flink.zanxiangnet.ad.monitoring.kafka.KafkaComponent;
 import flink.zanxiangnet.ad.monitoring.pojo.dto.AdStatOfDayODSDTO;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.AdDataOfDayODS;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfDayDWD;
+import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
 import flink.zanxiangnet.ad.monitoring.process.AdDayDWDRollMonthProcess;
 import flink.zanxiangnet.ad.monitoring.process.AdDayDWDRollYearProcess;
 import flink.zanxiangnet.ad.monitoring.sink.AdDayDWDToCkBatchSink;
@@ -16,9 +17,12 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.kafka.source.KafkaSource;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.util.Collector;
@@ -43,6 +47,26 @@ public class AdDayStreamJob {
         });
         env.getConfig().setGlobalJobParameters(configuration);
 
+        // checkpoint配置
+        env.enableCheckpointing(3 * 60 * 1000L, CheckpointingMode.EXACTLY_ONCE);
+        // checkpoint执行超时时间,超时则 checkpoint失败
+        env.getCheckpointConfig().setCheckpointTimeout(5 * 60 * 1000L);
+        // checkpoint执行最小间隔时间
+        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(60 * 1000L);
+        // 允许并行执行 checkpoint个数,当指定了 minPauseBetweenCheckpoints时,其值无效(就是 1)
+        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+        // 开启 checkpoints的外部持久化,但是在job失败的时候不会自动清理,需要自己手工清理state。
+        // ExternalizedCheckpointCleanup用于指定当job canceled的时候外部的 checkpoint该如何清理
+        // DELETE_ON_CANCELLATION: 在job canceled的时候会自动删除外部 state,但是如果是FAILED的状态则会保留
+        // RETAIN_ON_CANCELLATION:在job canceled的时候保留外部 state
+        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+        // 大概是允许 checkpoint失败几次,默认 0
+        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0);
+        env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
+        if (StringUtils.isNotBlank(props.getProperty(ApplicationProperties.CHECKPOINT_SAVEPOINT))) {
+            env.getCheckpointConfig().setCheckpointStorage(props.getProperty(ApplicationProperties.CHECKPOINT_SAVEPOINT));
+        }
+
         KafkaSource<String> adStreamOfDaySource = KafkaComponent.buildKafkaSource(props, KafkaComponent.KafkaTopic.adDayTopic, KafkaComponent.KafkaTopic.KafkaGroupId.adDayConsumerGroup);
 
         DataStreamSource<String> adStreamOfDayIn = env.fromSource(adStreamOfDaySource, WatermarkStrategy.noWatermarks(), "adDaySource_kafka");
@@ -57,7 +81,7 @@ public class AdDayStreamJob {
                 .map(AdStatOfDayODSDTO::byJson);
 
         // 写入原始表
-        new KeyedBatchStream<>("adDayODSStream", adDayODSStream.map(AdStatOfDayODSDTO::getAdDataOfDayODS).keyBy(AdDataOfDayODS::getStatDay), 4000L, 60 * 1000L)
+        new KeyedBatchStream<>("adDayODSStream", adDayODSStream.map(AdStatOfDayODSDTO::getAdDataOfDayODS).keyBy(AdDataOfDayODS::getStatDay), 1000L, 60 * 1000L)
                 .toBatch()
                 .addSink(new TunnelBatchStreamSink<>(AdDataOfDayODS.class))
                 .name("sink_ad_day_ods");
@@ -91,7 +115,7 @@ public class AdDayStreamJob {
 
         DataStream<AdStatOfDayDWD> adDayStream = adDayDWDMonthStream.union(adDayDWDYearStream);
         // 写入 maxCompute
-        new KeyedBatchStream<>("adDayStream", adDayStream.keyBy(AdStatOfDayDWD::getStatDay), 4000L, 60 * 1000L)
+        new KeyedBatchStream<>("adDayStream", adDayStream.keyBy(AdStatOfDayDWD::getStatDay), 1000L, 60 * 1000L)
                 .toBatch()
                 .addSink(new TunnelBatchStreamSink<>(AdStatOfDayDWD.class))
                 .name("sink_ad_year_dwd");

+ 30 - 6
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdHourStreamJob.java

@@ -1,6 +1,7 @@
 package flink.zanxiangnet.ad.monitoring;
 
 import flink.zanxiangnet.ad.monitoring.pojo.entity.*;
+import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
 import flink.zanxiangnet.ad.monitoring.process.*;
 import flink.zanxiangnet.ad.monitoring.sink.AdDayDWDToCkBatchSink;
 import flink.zanxiangnet.ad.monitoring.sink.AdHourDMToCkBatchSink;
@@ -15,9 +16,12 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.eventtime.*;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.kafka.source.KafkaSource;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
@@ -47,6 +51,26 @@ public class AdHourStreamJob {
         });
         env.getConfig().setGlobalJobParameters(configuration);
 
+        // checkpoint配置
+        env.enableCheckpointing(2 * 60 * 1000L, CheckpointingMode.EXACTLY_ONCE);
+        // checkpoint执行超时时间,超时则 checkpoint失败
+        env.getCheckpointConfig().setCheckpointTimeout(5 * 60 * 1000L);
+        // checkpoint执行最小间隔时间
+        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(60 * 1000L);
+        // 允许并行执行 checkpoint个数,当指定了 minPauseBetweenCheckpoints时,其值无效(就是 1)
+        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+        // 开启 checkpoints的外部持久化,但是在job失败的时候不会自动清理,需要自己手工清理state。
+        // ExternalizedCheckpointCleanup用于指定当job canceled的时候外部的 checkpoint该如何清理
+        // DELETE_ON_CANCELLATION: 在job canceled的时候会自动删除外部 state,但是如果是FAILED的状态则会保留
+        // RETAIN_ON_CANCELLATION:在job canceled的时候保留外部 state
+        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+        // 大概是允许 checkpoint失败几次,默认 0
+        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0);
+        env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
+        if (StringUtils.isNotBlank(props.getProperty(ApplicationProperties.CHECKPOINT_SAVEPOINT))) {
+            env.getCheckpointConfig().setCheckpointStorage(props.getProperty(ApplicationProperties.CHECKPOINT_SAVEPOINT));
+        }
+
         KafkaSource<String> adStreamOfMinuteSource = KafkaComponent.buildKafkaSource(props, KafkaComponent.KafkaTopic.adHourTopic, KafkaComponent.KafkaTopic.KafkaGroupId.adHourConsumerGroup);
         DataStreamSource<String> adStreamOfMinuteIn = env.fromSource(adStreamOfMinuteSource, WatermarkStrategy.noWatermarks(), "adHourSource_kafka");
 
@@ -64,7 +88,7 @@ public class AdHourStreamJob {
         // 分钟流
         DataStream<AdDataOfMinuteODS> adMinuteODSStream = adODSStream.getSideOutput(adMinuteStreamTag);
         // 分钟流-写入原始表
-        new KeyedBatchStream<>("adMinuteODSStream", adMinuteODSStream.keyBy(AdDataOfMinuteODS::getStatDay), 4000L, 2 * 60 * 1000L)
+        new KeyedBatchStream<>("adMinuteODSStream", adMinuteODSStream.keyBy(AdDataOfMinuteODS::getStatDay), 1000L, 2 * 60 * 1000L)
                 .toBatch()
                 .addSink(new TunnelBatchStreamSink<>(AdDataOfMinuteODS.class))
                 .name("sink_ad_minute_ods");
@@ -79,7 +103,7 @@ public class AdHourStreamJob {
                 .window(TumblingEventTimeWindows.of(Time.minutes(5L)))
                 .trigger(new AdMinuteODSStreamTrigger())
                 .process(new AdMinuteDWDProcess());
-        new KeyedBatchStream<>("adMinuteDWDStream", adMinuteDWDStream.keyBy(AdStatOfMinuteDWD::getStatDay), 4000L, 60 * 1000L)
+        new KeyedBatchStream<>("adMinuteDWDStream", adMinuteDWDStream.keyBy(AdStatOfMinuteDWD::getStatDay), 1000L, 60 * 1000L)
                 .toBatch()
                 .addSink(new TunnelBatchStreamSink<>(AdStatOfMinuteDWD.class))
                 .name("sink_ad_minute_dwd");
@@ -89,7 +113,7 @@ public class AdHourStreamJob {
                 adMinuteDWDStream
                         .keyBy(AdStatOfMinuteDWD::getAdId)
                         .process(new CostMinuteProcess());
-        new BatchStream<>("adMinuteDMStream", clickhouseMinuteDmStream, 1000L, 60 * 1000L)
+        new BatchStream<>("adMinuteDMStream", clickhouseMinuteDmStream, 4000L, 60 * 1000L)
                 .toBatch()
                 .addSink(new AdMinuteDMToCkBatchSink())
                 .name("sink_ad_minute_dm_clickhouse");
@@ -97,7 +121,7 @@ public class AdHourStreamJob {
         // 小时流
         DataStream<AdDataOfHourODS> adHourODSStream = adODSStream.getSideOutput(adHourStreamTag);
         // 小时流-写入原始表
-        new KeyedBatchStream<>("adHourODSStream", adHourODSStream.keyBy(AdDataOfHourODS::getStatDay), 4000L, 3 * 60 * 1000L)
+        new KeyedBatchStream<>("adHourODSStream", adHourODSStream.keyBy(AdDataOfHourODS::getStatDay), 1000L, 3 * 60 * 1000L)
                 .toBatch()
                 .addSink(new TunnelBatchStreamSink<>(AdDataOfHourODS.class))
                 .name("sink_ad_hour_ods");
@@ -108,7 +132,7 @@ public class AdHourStreamJob {
                         .process(new AdHourDWDProcess());
 
         // 小时流-写入maxCompute
-        new KeyedBatchStream<>("adHourDWDStream", adHourDWDStream.keyBy(AdStatOfHourDWD::getStatDay), 4000L, 3 * 60 * 1000L)
+        new KeyedBatchStream<>("adHourDWDStream", adHourDWDStream.keyBy(AdStatOfHourDWD::getStatDay), 1000L, 3 * 60 * 1000L)
                 .toBatch()
                 .addSink(new TunnelBatchStreamSink<>(AdStatOfHourDWD.class))
                 .name("sink_ad_hour_dwd");
@@ -124,7 +148,7 @@ public class AdHourStreamJob {
                 adHourDWDAllStream
                         .keyBy(AdStatOfHourDWD::getAdId)
                         .process(new CostHourProcess());
-        new BatchStream<>("adHourDMStream", adHourDMStream, 1000L, 60 * 1000L)
+        new BatchStream<>("adHourDMStream", adHourDMStream, 2000L, 60 * 1000L)
                 .toBatch()
                 .addSink(new AdHourDMToCkBatchSink())
                 .name("sink_ad_hour_dm_clickhouse");

+ 24 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/PlanDayStreamJob.java

@@ -4,6 +4,7 @@ import flink.zanxiangnet.ad.monitoring.kafka.KafkaComponent;
 import flink.zanxiangnet.ad.monitoring.pojo.dto.AdStatOfDayODSDTO;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.AdDataOfDayODS;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.PlanStatOfDayDWD;
+import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
 import flink.zanxiangnet.ad.monitoring.process.PlanDayDWDRollMonthProcess;
 import flink.zanxiangnet.ad.monitoring.process.PlanDayDWDRollYearProcess;
 import flink.zanxiangnet.ad.monitoring.sink.PlanDayDWDToCkBatchSink;
@@ -16,9 +17,12 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.kafka.source.KafkaSource;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.util.Collector;
@@ -41,6 +45,26 @@ public class PlanDayStreamJob {
             configuration.setString(key.trim(), StringUtils.isBlank(value) ? "" : value.trim());
         });
         env.getConfig().setGlobalJobParameters(configuration);
+
+        // checkpoint配置
+        env.enableCheckpointing(3 * 60 * 1000L, CheckpointingMode.EXACTLY_ONCE);
+        // checkpoint执行超时时间,超时则 checkpoint失败
+        env.getCheckpointConfig().setCheckpointTimeout(5 * 60 * 1000L);
+        // checkpoint执行最小间隔时间
+        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(60 * 1000L);
+        // 允许并行执行 checkpoint个数,当指定了 minPauseBetweenCheckpoints时,其值无效(就是 1)
+        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+        // 开启 checkpoints的外部持久化,但是在job失败的时候不会自动清理,需要自己手工清理state。
+        // ExternalizedCheckpointCleanup用于指定当job canceled的时候外部的 checkpoint该如何清理
+        // DELETE_ON_CANCELLATION: 在job canceled的时候会自动删除外部 state,但是如果是FAILED的状态则会保留
+        // RETAIN_ON_CANCELLATION:在job canceled的时候保留外部 state
+        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+        // 大概是允许 checkpoint失败几次,默认 0
+        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0);
+        env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
+        if (StringUtils.isNotBlank(props.getProperty(ApplicationProperties.CHECKPOINT_SAVEPOINT))) {
+            env.getCheckpointConfig().setCheckpointStorage(props.getProperty(ApplicationProperties.CHECKPOINT_SAVEPOINT));
+        }
         KafkaSource<String> adStreamOfDaySource = KafkaComponent.buildKafkaSource(props, KafkaComponent.KafkaTopic.adDayTopic, KafkaComponent.KafkaTopic.KafkaGroupId.planDayConsumerGroup);
 
         DataStreamSource<String> adStreamOfDayIn = env.fromSource(adStreamOfDaySource, WatermarkStrategy.noWatermarks(), "planDaySource_kafka");

+ 25 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/PlanHourStreamJob.java

@@ -1,6 +1,7 @@
 package flink.zanxiangnet.ad.monitoring;
 
 import flink.zanxiangnet.ad.monitoring.pojo.entity.*;
+import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
 import flink.zanxiangnet.ad.monitoring.process.PlanHourDTOStreamProcess;
 import flink.zanxiangnet.ad.monitoring.process.PlanHourDWDProcess;
 import flink.zanxiangnet.ad.monitoring.process.PlanHourStreamCompletionProcess;
@@ -14,9 +15,13 @@ import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.kafka.source.KafkaSource;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
@@ -45,6 +50,26 @@ public class PlanHourStreamJob {
         });
         env.getConfig().setGlobalJobParameters(configuration);
 
+        // checkpoint配置
+        env.enableCheckpointing(2 * 60 * 1000L, CheckpointingMode.EXACTLY_ONCE);
+        // checkpoint执行超时时间,超时则 checkpoint失败
+        env.getCheckpointConfig().setCheckpointTimeout(5 * 60 * 1000L);
+        // checkpoint执行最小间隔时间
+        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(60 * 1000L);
+        // 允许并行执行 checkpoint个数,当指定了 minPauseBetweenCheckpoints时,其值无效(就是 1)
+        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+        // 开启 checkpoints的外部持久化,但是在job失败的时候不会自动清理,需要自己手工清理state。
+        // ExternalizedCheckpointCleanup用于指定当job canceled的时候外部的 checkpoint该如何清理
+        // DELETE_ON_CANCELLATION: 在job canceled的时候会自动删除外部 state,但是如果是FAILED的状态则会保留
+        // RETAIN_ON_CANCELLATION:在job canceled的时候保留外部 state
+        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+        // 大概是允许 checkpoint失败几次,默认 0
+        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0);
+        env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
+        if (StringUtils.isNotBlank(props.getProperty(ApplicationProperties.CHECKPOINT_SAVEPOINT))) {
+            env.getCheckpointConfig().setCheckpointStorage(props.getProperty(ApplicationProperties.CHECKPOINT_SAVEPOINT));
+        }
+
         KafkaSource<String> adStreamOfMinuteSource = KafkaComponent.buildKafkaSource(props, KafkaComponent.KafkaTopic.adHourTopic, KafkaComponent.KafkaTopic.KafkaGroupId.planHourConsumerGroup);
         DataStreamSource<String> adStreamOfMinuteIn = env.fromSource(adStreamOfMinuteSource, WatermarkStrategy.noWatermarks(), "planHourSource_kafka");
 

+ 4 - 4
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/kafka/KafkaComponent.java

@@ -17,11 +17,11 @@ public class KafkaComponent {
         public static final String adDayTopic = "ad_day_cost_topic";
 
         public static class KafkaGroupId {
-            public static final String adHourConsumerGroup = "ad_hour_consumer1";
-            public static final String adDayConsumerGroup = "ad_day_consumer1";
+            public static final String adHourConsumerGroup = "ad_hour_consumer";
+            public static final String adDayConsumerGroup = "ad_day_consumer";
 
-            public static final String planHourConsumerGroup = "plan_hour_consumer1";
-            public static final String planDayConsumerGroup = "plan_day_consumer1";
+            public static final String planHourConsumerGroup = "plan_hour_consumer";
+            public static final String planDayConsumerGroup = "plan_day_consumer";
         }
     }
 

+ 22 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/pojo/entity/AdStatOfDayDWD.java

@@ -1541,6 +1541,28 @@ public class AdStatOfDayDWD implements Serializable {
     public static AdStatOfDayDWD byHourDWD(AdStatOfHourDWD hourDWD) {
         AdStatOfDayDWD result = new AdStatOfDayDWD();
         BeanUtils.copyProperties(hourDWD, result);
+        result.setViewUserCountTotal(0L);
+        result.setViewUserCountDay(0L);
+        result.setAvgViewPerUserAll(0.0);
+        result.setAvgViewPerUserDay(0.0);
+        result.setClickUserCountTotal(0L);
+        result.setClickUserCountDay(0L);
+        result.setKeyPageUvTotal(0L);
+        result.setKeyPageUvDay(0L);
+        result.setAddWishlistCountTotal(0L);
+        result.setAddWishlistCountDay(0L);
+        result.setViewCommodityPageUvTotal(0L);
+        result.setViewCommodityPageUvDay(0L);
+        result.setPageReservationCountTotal(0L);
+        result.setPageReservationCountDay(0L);
+        result.setLeadsPurchaseUvTotal(0L);
+        result.setLeadsPurchaseUvDay(0L);
+        result.setLeadsPurchaseCostAll(0L);
+        result.setLeadsPurchaseCostDay(0L);
+        result.setLeadsPurchaseRateAll(0.0);
+        result.setLeadsPurchaseRateDay(0.0);
+        result.setOfficialAccountFollowCostAll(0L);
+        result.setOfficialAccountFollowCostDay(0L);
         return result;
     }
 

+ 2 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/pojo/properties/ApplicationProperties.java

@@ -16,4 +16,6 @@ public class ApplicationProperties {
     public static final String OSS_ASSESS_KEY_SECRET = "oss.accessKeySecret";
     public static final String OSS_ENDPOINT = "oss.endpoint";
     public static final String OSS_BUCKET = "oss.bucket";
+
+    public static final String CHECKPOINT_SAVEPOINT = "checkpoint.savePath";
 }

+ 21 - 27
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/sink/TunnelBatchStreamSink.java

@@ -11,6 +11,7 @@ import flink.zanxiangnet.ad.monitoring.maxcompute.MaxComputeLog;
 import flink.zanxiangnet.ad.monitoring.maxcompute.bean.BeanUtil;
 import flink.zanxiangnet.ad.monitoring.maxcompute.bean.annotation.MaxComputeTable;
 import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -30,11 +31,8 @@ import java.util.stream.Collectors;
  *
  * @param <IN>
  */
+@Slf4j
 public class TunnelBatchStreamSink<T, IN extends List<T>> extends RichSinkFunction<IN> {
-    private static final Logger log = LoggerFactory.getLogger(TunnelBatchStreamSink.class);
-
-    // 对象锁,防止MaxCompute的 Tunnel对象多次初始化
-    private static final Object DUMMY_LOCK = new Object();
 
     private final Class<T> clazz;
     private String projectName;
@@ -50,30 +48,26 @@ public class TunnelBatchStreamSink<T, IN extends List<T>> extends RichSinkFuncti
 
     @Override
     public void open(Configuration config) {
-        if (tunnel == null) {
-            synchronized (DUMMY_LOCK) {
-                if (tunnel == null) {
-                    Map<String, String> params = getRuntimeContext()
-                            .getExecutionConfig()
-                            .getGlobalJobParameters()
-                            .toMap();
-                    MaxComputeTable tableAnnotation = clazz.getAnnotation(MaxComputeTable.class);
+        Map<String, String> params = getRuntimeContext()
+                .getExecutionConfig()
+                .getGlobalJobParameters()
+                .toMap();
+        MaxComputeTable tableAnnotation = clazz.getAnnotation(MaxComputeTable.class);
 
-                    Account account = new AliyunAccount(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_ID),
-                            params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_KEY));
-                    Odps odps = new Odps(account);
-                    odps.getRestClient().setRetryLogger(new MaxComputeLog());
-                    odps.setEndpoint(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_ENDPOINT));
-                    odps.setDefaultProject(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_PROJECT_NAME));
-                    tunnel = new TableTunnel(odps);
-                    tunnel.setEndpoint(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_TUNNEL_ENDPOINT));
-                    projectName = params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_PROJECT_NAME);
-                    tableName = tableAnnotation.value();
-                    fieldInfoList = BeanUtil.parseBeanField(clazz);
-                    partitionFieldMethods = fieldInfoList.stream().filter(BeanUtil.FieldInfo::isUsePartitioned).collect(Collectors.toMap(BeanUtil.FieldInfo::getColumnName, BeanUtil.FieldInfo::getGetMethod));
-                }
-            }
-        }
+        Account account = new AliyunAccount(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_ID),
+                params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_KEY));
+        Odps odps = new Odps(account);
+        odps.getRestClient().setRetryLogger(new MaxComputeLog());
+        odps.setEndpoint(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_ENDPOINT));
+        odps.setDefaultProject(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_PROJECT_NAME));
+        tunnel = new TableTunnel(odps);
+        tunnel.setEndpoint(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_TUNNEL_ENDPOINT));
+        projectName = params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_PROJECT_NAME);
+        tableName = tableAnnotation.value();
+        fieldInfoList = BeanUtil.parseBeanField(clazz);
+        partitionFieldMethods = fieldInfoList.stream()
+                .filter(BeanUtil.FieldInfo::isUsePartitioned)
+                .collect(Collectors.toMap(BeanUtil.FieldInfo::getColumnName, BeanUtil.FieldInfo::getGetMethod));
     }
 
     /**