Explorar o código

初始化项目

cola %!s(int64=2) %!d(string=hai) anos
pai
achega
7f891e09dc

+ 1 - 1
src/main/java/com/qucheng/game/data/oldsystem/Env.java

@@ -1,5 +1,5 @@
 package com.qucheng.game.data.oldsystem;
 
 public class Env {
-    public static final boolean isTest = false;
+    public static final boolean isTest = true;
 }

+ 138 - 0
src/main/java/com/qucheng/game/data/oldsystem/ods/ByteDailyCost.java

@@ -0,0 +1,138 @@
+package com.qucheng.game.data.oldsystem.ods;
+
+import com.qucheng.game.data.oldsystem.Env;
+import com.qucheng.game.data.oldsystem.pojo.FlinkAppConfigParam;
+import com.qucheng.game.data.oldsystem.pojo.MysqlConfigParam;
+import com.qucheng.game.data.oldsystem.pojo.TransportMap;
+import com.qucheng.game.data.oldsystem.serialization.MapDebeziumDeserializationSchema;
+import com.qucheng.game.data.oldsystem.sink.CdcMysqlTablesSink;
+import com.ververica.cdc.connectors.mysql.source.MySqlSource;
+import com.ververica.cdc.connectors.mysql.table.StartupOptions;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+public class ByteDailyCost {
+    private static final FlinkAppConfigParam appConfigParam = Env.isTest ? FlinkAppConfigParam.builder()
+            .savePath("")
+            .interval(300L)
+            .timeout(300L)
+            .minBetween(1L)
+            .build() : FlinkAppConfigParam.builder()
+            .savePath("hdfs://nameservice1:8020/user/limeng/cluster_yarn/checkpoints/ODS/" + ByteDailyCost.class.getSimpleName())
+            .interval(300L)
+            .timeout(300L)
+            .minBetween(1L)
+            .build();
+
+    public static void main(String[] args) throws Exception {
+        System.setProperty("HADOOP_USER_NAME", "flink");
+
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        // 加载配置文件到 flink的全局配置中
+        Properties props = new Properties();
+        props.load(ByteDailyCost.class.getResourceAsStream(Env.isTest ? "/application.test.properties" : "/application.properties"));
+        Configuration configuration = new Configuration();
+        props.stringPropertyNames().forEach(key -> {
+            String value = props.getProperty(key);
+            configuration.setString(key.trim(), StringUtils.isBlank(value) ? "" : value.trim());
+        });
+        env.getConfig().setGlobalJobParameters(configuration);
+        // 设置默认并行度
+        env.setParallelism(1);
+
+        // 任务失败后的重启策略
+        // env.setRestartStrategy(RestartStrategies.noRestart());// 失败不重启
+        // env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));// 3:最大重试次数、10:重启间隔时间
+        env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.of(5, TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)));// 5分钟内最多重启 3次,重启间隔时间:10s
+
+        // checkpoint配置
+        env.enableCheckpointing(appConfigParam.getInterval() * 1000, CheckpointingMode.EXACTLY_ONCE);
+        // checkpoint执行超时时间,超时则 checkpoint失败
+        env.getCheckpointConfig().setCheckpointTimeout(appConfigParam.getTimeout() * 1000);
+        // checkpoint执行最小间隔时间
+        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(appConfigParam.getMinBetween() * 1000);
+        // 允许并行执行 checkpoint个数,当指定了 minPauseBetweenCheckpoints时,其值无效(就是 1)
+        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+        // 开启 checkpoints的外部持久化,但是在job失败的时候不会自动清理,需要自己手工清理state。
+        // ExternalizedCheckpointCleanup用于指定当job canceled的时候外部的 checkpoint该如何清理
+        // DELETE_ON_CANCELLATION: 在job canceled的时候会自动删除外部 state,但是如果是FAILED的状态则会保留
+        // RETAIN_ON_CANCELLATION:在job canceled的时候保留外部 state
+        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+        // 设置可容忍的检查点失败数,默认值为0表示不允许容忍任何检查点失败
+        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
+        // 设置状态后端
+        env.setStateBackend(new HashMapStateBackend());
+        // 设置检查点目录
+        if (StringUtils.isNotBlank(appConfigParam.getSavePath())) {
+            env.getCheckpointConfig().setCheckpointStorage(appConfigParam.getSavePath());
+        }
+
+        MySqlSource<TransportMap> mysqlCDCSource = MySqlSource.<TransportMap>builder()
+                .hostname(props.getProperty("cdc.mysql.backup.host"))
+                .port(StringUtils.isBlank(props.getProperty("cdc.mysql.backup.port")) ? 3306 : Integer.parseInt(props.getProperty("cdc.mysql.backup.port")))
+                .username(props.getProperty("cdc.mysql.backup.username"))
+                .password(props.getProperty("cdc.mysql.backup.password"))
+                .databaseList("ods")
+                .tableList((StringUtils.join(new String[]{
+                        "ods.byte_t_ad_data_day"
+                }, ",")))
+                .deserializer(new MapDebeziumDeserializationSchema())
+                .startupOptions(StartupOptions.initial())
+                .build();
+
+        MysqlConfigParam sinkMysqlConfig = MysqlConfigParam.builder()
+                .url(props.getProperty("mysql.quChengText.url"))
+                .username(props.getProperty("mysql.quChengText.username"))
+                .password(props.getProperty("mysql.quChengText.password"))
+                .build();
+
+        DataStreamSource<TransportMap> cdcSource = env.fromSource(mysqlCDCSource, WatermarkStrategy.noWatermarks(), "mysqlCDCSource");
+        cdcSource.print();
+        cdcSource.map(new FieldFilterMap())
+                .addSink(new CdcMysqlTablesSink(sinkMysqlConfig));
+
+        env.execute(ByteDailyCost.class.getSimpleName());
+    }
+
+    /**
+     * 字段过滤和映射
+     */
+    public static class FieldFilterMap implements MapFunction<TransportMap, TransportMap> {
+        @Override
+        public TransportMap map(TransportMap transportMap) throws Exception {
+            if (transportMap.getAfter() == null) {
+                return transportMap;
+            }
+            Map<String, Object> after = transportMap.getAfter();
+            if (transportMap.getTableName().equalsIgnoreCase("byte_t_ad_data_day")) {
+                Map<String, Object> pojo = new HashMap<>(after.size());
+                pojo.put("account_id",after.get("account_id"));
+                pojo.put("date",after.get("day"));
+                pojo.put("cost",after.get("cost"));
+                pojo.put("view_count",after.get("show"));
+                pojo.put("valid_click_count",after.get("click"));
+                pojo.put("ctr",after.get("ctr"));
+                after = pojo;
+                transportMap.setTableName("daily_tt");
+                transportMap.setPrimaryKeys(Arrays.asList("account_id", "day"));
+                transportMap.setAfter(after);
+            }
+            return transportMap;
+        }
+    }
+}

+ 2 - 2
src/main/java/com/qucheng/game/data/oldsystem/ods/OldGameSystemCDCBgNew.java

@@ -86,7 +86,7 @@ public class OldGameSystemCDCBgNew {
                 .port(StringUtils.isBlank(props.getProperty("cdc.mysql.backup.port")) ? 3306 : Integer.parseInt(props.getProperty("cdc.mysql.backup.port")))
                 .username(props.getProperty("cdc.mysql.backup.username"))
                 .password(props.getProperty("cdc.mysql.backup.password"))
-                .databaseList("bg_new_sdk_db_mp,bg_old_sdk_db_mp,ods")
+                .databaseList("bg_new_sdk_db_mp")
                 .tableList((StringUtils.join(new String[]{
                         "bg_new_sdk_db_mp.h_game",
                         "bg_new_sdk_db_mp.h_mem_game",
@@ -97,7 +97,7 @@ public class OldGameSystemCDCBgNew {
                         "bg_new_sdk_db_mp.h_user"
                 }, ",")))
                 .deserializer(new MapDebeziumDeserializationSchema())
-                .startupOptions(StartupOptions.initial())
+                .startupOptions(StartupOptions.latest())
                 .build();
 
         MysqlConfigParam sinkMysqlConfig = MysqlConfigParam.builder()

+ 2 - 2
src/main/java/com/qucheng/game/data/oldsystem/ods/OldGameSystemCDCBgOld.java

@@ -86,7 +86,7 @@ public class OldGameSystemCDCBgOld {
                 .port(StringUtils.isBlank(props.getProperty("cdc.mysql.backup.port")) ? 3306 : Integer.parseInt(props.getProperty("cdc.mysql.backup.port")))
                 .username(props.getProperty("cdc.mysql.backup.username"))
                 .password(props.getProperty("cdc.mysql.backup.password"))
-                .databaseList("bg_new_sdk_db_mp,bg_old_sdk_db_mp,ods")
+                .databaseList("bg_old_sdk_db_mp")
                 .tableList((StringUtils.join(new String[]{
                         "bg_old_sdk_db_mp.h_game",
                         "bg_old_sdk_db_mp.h_mem_game",
@@ -97,7 +97,7 @@ public class OldGameSystemCDCBgOld {
                         "bg_old_sdk_db_mp.h_user"
                 }, ",")))
                 .deserializer(new MapDebeziumDeserializationSchema())
-                .startupOptions(StartupOptions.initial())
+                .startupOptions(StartupOptions.latest())
                 .build();
 
         MysqlConfigParam sinkMysqlConfig = MysqlConfigParam.builder()

+ 1 - 0
src/main/java/com/qucheng/game/data/oldsystem/sink/CdcMysqlTablesSink.java

@@ -67,6 +67,7 @@ public class CdcMysqlTablesSink extends RichSinkFunction<TransportMap> {
         List<String> primaryKeys = value.getPrimaryKeys();
         Map<String, Object> data = value.getAfter();
         if (Envelope.Operation.DELETE == value.getOperation()) {
+            data = value.getBefore();
             // 删除
             Map<String, Object> delParamMap = primaryKeys.stream().collect(Collectors.toMap(Function.identity(), data::get));
             try (SqlSession session = sqlSessionFactory.openSession(true)) {

+ 1 - 1
src/main/resources/log4j2.properties

@@ -23,4 +23,4 @@ appender.console.name = ConsoleAppender
 appender.console.type = CONSOLE
 appender.console.layout.type = PatternLayout
 appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
-com.qucheng.game.data.ods.dao.mapper = info
+com.qucheng.game.data.oldsystem.dao.mapper = INFO