Jelajahi Sumber

初始化项目

cola 2 tahun lalu
induk
melakukan
9cca413663

+ 5 - 0
src/main/java/com/qucheng/game/data/oldsystem/Env.java

@@ -0,0 +1,5 @@
+package com.qucheng.game.data.oldsystem;
+
+public class Env {
+    public static final boolean isTest = false;
+}

+ 43 - 0
src/main/java/com/qucheng/game/data/oldsystem/dao/mapper/BaseMapper.java

@@ -0,0 +1,43 @@
+package com.qucheng.game.data.oldsystem.dao.mapper;
+
+import org.apache.ibatis.annotations.Param;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public interface BaseMapper {
+
+    Map<String, Object> getById(@Param("tableName") String tableName,
+                                @Param("primaryKeyMap") Map<String, Object> primaryKeyMap);
+
+    Map<String, Object> queryOne(@Param("tableName") String tableName,
+                                 @Param("queryParamMap") Map<String, Object> queryParamMap);
+
+    List<Map<String, Object>> queryList(@Param("tableName") String tableName,
+                                        @Param("queryParamMap") Map<String, Object> queryParamMap);
+
+    default int saveOrUpdate(String tableName, List<String> primaryKeys, Map<String, Object> obj) {
+        List<String> fieldList = new ArrayList<>(obj.size());
+        List<Object> valueList = new ArrayList<>(obj.size());
+        Map<String, Object> updateParamMap = new HashMap<>(obj.size());
+        for (Map.Entry<String, Object> entry : obj.entrySet()) {
+            fieldList.add(entry.getKey());
+            valueList.add(entry.getValue());
+            if (primaryKeys.contains(entry.getKey())) {
+                continue;
+            }
+            updateParamMap.put(entry.getKey(), entry.getValue());
+        }
+        return insertOrUpdate(tableName, fieldList, valueList, updateParamMap);
+    }
+
+    int insertOrUpdate(@Param("tableName") String tableName,
+                       @Param("fieldList") List<String> fieldList,
+                       @Param("valueList") List<Object> valueList,
+                       @Param("updateParamMap") Map<String, Object> updateParamMap);
+
+    int delById(@Param("tableName") String tableName,
+                @Param("primaryKeyMap") Map<String, Object> primaryKeyMap);
+}

+ 54 - 0
src/main/java/com/qucheng/game/data/oldsystem/dao/mapper/BaseMapper.xml

@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+
+<mapper namespace="com.qucheng.game.data.oldsystem.dao.mapper.BaseMapper">
+
+    <select id="getById" resultType="java.util.Map">
+        SELECT * FROM ${tableName} WHERE
+        <foreach collection="primaryKeyMap" index="key" item="value" separator="and">
+            `${key}` = #{value}
+        </foreach>
+    </select>
+
+    <select id="queryOne" resultType="java.util.Map">
+        SELECT * FROM ${tableName}
+        <where>
+            <foreach collection="queryParamMap" index="key" item="value" separator="and">
+                <if test="value != null">`${key}` = #{value}</if>
+            </foreach>
+        </where>
+        LIMIT 1
+    </select>
+
+    <select id="queryList" resultType="java.util.Map">
+        SELECT * FROM ${tableName}
+        <where>
+            <foreach collection="queryParamMap" index="key" item="value" separator="and">
+                <if test="value != null">`${key}` = #{value}</if>
+            </foreach>
+        </where>
+    </select>
+
+    <insert id="insertOrUpdate">
+        INSERT INTO ${tableName} (
+        <foreach collection="fieldList" item="fieldName" separator=",">
+            `${fieldName}`
+        </foreach>
+        ) VALUES(
+        <foreach collection="valueList" item="value" separator=",">
+            #{value}
+        </foreach>
+        ) ON DUPLICATE KEY UPDATE
+        <foreach collection="updateParamMap" index="key" item="value" separator=",">
+            `${key}` = #{value}
+        </foreach>
+    </insert>
+
+    <delete id="delById">
+        DELETE FROM ${tableName} WHERE
+        <foreach collection="primaryKeyMap" index="key" item="value" separator=",">
+            `${key}` = #{value}
+        </foreach>
+    </delete>
+
+</mapper>

+ 331 - 0
src/main/java/com/qucheng/game/data/oldsystem/ods/OldGameSystemCDCBgNew.java

@@ -0,0 +1,331 @@
+package com.qucheng.game.data.oldsystem.ods;
+
+import com.qucheng.game.data.oldsystem.Env;
+import com.qucheng.game.data.oldsystem.pojo.FlinkAppConfigParam;
+import com.qucheng.game.data.oldsystem.pojo.MysqlConfigParam;
+import com.qucheng.game.data.oldsystem.pojo.TransportMap;
+import com.qucheng.game.data.oldsystem.serialization.MapDebeziumDeserializationSchema;
+import com.qucheng.game.data.oldsystem.sink.CdcMysqlTablesSink;
+import com.ververica.cdc.connectors.mysql.source.MySqlSource;
+import com.ververica.cdc.connectors.mysql.table.StartupOptions;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+public class OldGameSystemCDCBgNew {
+    private static final FlinkAppConfigParam appConfigParam = Env.isTest ? FlinkAppConfigParam.builder()
+            .savePath("")
+            .interval(300L)
+            .timeout(300L)
+            .minBetween(1L)
+            .build() : FlinkAppConfigParam.builder()
+            .savePath("hdfs://nameservice1:8020/user/limeng/cluster_yarn/checkpoints/ODS/" + OldGameSystemCDCBgNew.class.getSimpleName())
+            .interval(300L)
+            .timeout(300L)
+            .minBetween(1L)
+            .build();
+
+    public static void main(String[] args) throws Exception {
+        System.setProperty("HADOOP_USER_NAME", "flink");
+
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        // 加载配置文件到 flink的全局配置中
+        Properties props = new Properties();
+        props.load(OldGameSystemCDCBgNew.class.getResourceAsStream(Env.isTest ? "/application.test.properties" : "/application.properties"));
+        Configuration configuration = new Configuration();
+        props.stringPropertyNames().forEach(key -> {
+            String value = props.getProperty(key);
+            configuration.setString(key.trim(), StringUtils.isBlank(value) ? "" : value.trim());
+        });
+        env.getConfig().setGlobalJobParameters(configuration);
+        // 设置默认并行度
+        env.setParallelism(1);
+
+        // 任务失败后的重启策略
+        // env.setRestartStrategy(RestartStrategies.noRestart());// 失败不重启
+        // env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));// 3:最大重试次数、10:重启间隔时间
+        env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.of(5, TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)));// 5分钟内最多重启 3次,重启间隔时间:10s
+
+        // checkpoint配置
+        env.enableCheckpointing(appConfigParam.getInterval() * 1000, CheckpointingMode.EXACTLY_ONCE);
+        // checkpoint执行超时时间,超时则 checkpoint失败
+        env.getCheckpointConfig().setCheckpointTimeout(appConfigParam.getTimeout() * 1000);
+        // checkpoint执行最小间隔时间
+        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(appConfigParam.getMinBetween() * 1000);
+        // 允许并行执行 checkpoint个数,当指定了 minPauseBetweenCheckpoints时,其值无效(就是 1)
+        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+        // 开启 checkpoints的外部持久化,但是在job失败的时候不会自动清理,需要自己手工清理state。
+        // ExternalizedCheckpointCleanup用于指定当job canceled的时候外部的 checkpoint该如何清理
+        // DELETE_ON_CANCELLATION: 在job canceled的时候会自动删除外部 state,但是如果是FAILED的状态则会保留
+        // RETAIN_ON_CANCELLATION:在job canceled的时候保留外部 state
+        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+        // 设置可容忍的检查点失败数,默认值为0表示不允许容忍任何检查点失败
+        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
+        // 设置状态后端
+        env.setStateBackend(new HashMapStateBackend());
+        // 设置检查点目录
+        if (StringUtils.isNotBlank(appConfigParam.getSavePath())) {
+            env.getCheckpointConfig().setCheckpointStorage(appConfigParam.getSavePath());
+        }
+
+        MySqlSource<TransportMap> mysqlCDCSource = MySqlSource.<TransportMap>builder()
+                .hostname(props.getProperty("cdc.mysql.backup.host"))
+                .port(StringUtils.isBlank(props.getProperty("cdc.mysql.backup.port")) ? 3306 : Integer.parseInt(props.getProperty("cdc.mysql.backup.port")))
+                .username(props.getProperty("cdc.mysql.backup.username"))
+                .password(props.getProperty("cdc.mysql.backup.password"))
+                .databaseList("bg_new_sdk_db_mp,bg_old_sdk_db_mp,ods")
+                .tableList((StringUtils.join(new String[]{
+                        "bg_new_sdk_db_mp.h_game",
+                        "bg_new_sdk_db_mp.h_mem_game",
+                        "bg_new_sdk_db_mp.h_member",
+                        "bg_new_sdk_db_mp.h_mg_role",
+                        "bg_new_sdk_db_mp.h_pay",
+                        "bg_new_sdk_db_mp.h_pay_ext",
+                        "bg_new_sdk_db_mp.h_user"
+                }, ",")))
+                .deserializer(new MapDebeziumDeserializationSchema())
+                .startupOptions(StartupOptions.initial())
+                .build();
+
+        MysqlConfigParam sinkMysqlConfig = MysqlConfigParam.builder()
+                .url(props.getProperty("mysql.oldGameSystemBgNewOds.url"))
+                .username(props.getProperty("mysql.oldGameSystemBgNewOds.username"))
+                .password(props.getProperty("mysql.oldGameSystemBgNewOds.password"))
+                .build();
+
+        env.fromSource(mysqlCDCSource, WatermarkStrategy.noWatermarks(), "mysqlCDCSource")
+                .map(new FieldFilterMap())
+                .addSink(new CdcMysqlTablesSink(sinkMysqlConfig));
+
+        env.execute(OldGameSystemCDCBgNew.class.getSimpleName());
+    }
+
+    /**
+     * 字段过滤和映射
+     */
+    public static class FieldFilterMap implements MapFunction<TransportMap, TransportMap> {
+        @Override
+        public TransportMap map(TransportMap transportMap) throws Exception {
+            if (transportMap.getAfter() == null) {
+                return transportMap;
+            }
+            Map<String, Object> after = transportMap.getAfter();
+            if (transportMap.getTableName().equalsIgnoreCase("h_game")) {
+                Map<String, Object> pojo = new HashMap<>(after.size());
+                for (String field : Arrays.asList("id",
+                        "name",
+                        "en_name",
+                        "en_abbr",
+                        "app_key",
+                        "tags",
+                        "category",
+                        "classify",
+                        "icon",
+                        "cp_payback_url",
+                        "cp_id",
+                        "parent_id",
+                        "package_name",
+                        "pay_switch",
+                        "order_switch",
+                        "pay_show",
+                        "float_is_show",
+                        "status",
+                        "is_delete",
+                        "delete_time",
+                        "is_online",
+                        "is_sdk",
+                        "list_order",
+                        "rise_order",
+                        "hot_order",
+                        "like_order",
+                        "publicity",
+                        "language",
+                        "description",
+                        "image",
+                        "run_time",
+                        "create_time",
+                        "update_time",
+                        "fine_order",
+                        "promote_switch",
+                        "apple_id",
+                        "add_cp_time",
+                        "is_bt",
+                        "is_auth",
+                        "single_tag")) {
+                    pojo.put(field, after.get(field));
+                }
+                after = pojo;
+            } else if (transportMap.getTableName().equalsIgnoreCase("h_mem_game")) {
+                Map<String, Object> pojo = new HashMap<>(after.size());
+                for (String field : Arrays.asList(
+                        "id",
+                        "mem_id",
+                        "guided_agent_id",
+                        "app_id",
+                        "nickname",
+                        "create_time",
+                        "update_time",
+                        "is_default",
+                        "like",
+                        "star_cnt",
+                        "status",
+                        "sum_money")) {
+                    pojo.put(field, after.get(field));
+                }
+                after = pojo;
+            } else if (transportMap.getTableName().equalsIgnoreCase("h_member")) {
+                Map<String, Object> pojo = new HashMap<>(after.size());
+                for (String field : Arrays.asList(
+                        "id",
+                        "nickname",
+                        "from_device",
+                        "device_id",
+                        "app_id",
+                        "agent_id",
+                        "status",
+                        "create_time",
+                        "update_time",
+                        "username")) {
+                    pojo.put(field, after.get(field));
+                }
+                after = pojo;
+            } else if (transportMap.getTableName().equalsIgnoreCase("h_mg_role")) {
+                Map<String, Object> pojo = new HashMap<>(after.size());
+                for (String field : Arrays.asList(
+                        "id",
+                        "mg_mem_id",
+                        "app_id",
+                        "server_id",
+                        "server_name",
+                        "role_id",
+                        "role_name",
+                        "role_level",
+                        "role_vip",
+                        "money",
+                        "combat_num",
+                        "ext",
+                        "os",
+                        "create_time",
+                        "update_time"
+                )) {
+                    pojo.put(field, after.get(field));
+                }
+                after = pojo;
+            } else if (transportMap.getTableName().equalsIgnoreCase("h_pay")) {
+                Map<String, Object> pojo = new HashMap<>(after.size());
+                for (String field : Arrays.asList(
+                        "id",
+                        "order_id",
+                        "cp_order_id",
+                        "mem_id",
+                        "mg_mem_id",
+                        "agent_id",
+                        "app_id",
+                        "currency",
+                        "amount",
+                        "real_amount",
+                        "product_id",
+                        "product_cnt",
+                        "product_name",
+                        "coupon_amount",
+                        "ptb_amount",
+                        "gm_amount",
+                        "integral",
+                        "integral_money",
+                        "rebate_amount",
+                        "rate",
+                        "status",
+                        "cp_status",
+                        "payway",
+                        "mobile_prefix",
+                        "is_handle",
+                        "pay_time",
+                        "create_time",
+                        "update_time",
+                        "is_distribute",
+                        "notify_cnt",
+                        "last_notify_time",
+                        "ext",
+                        "mem_note",
+                        "admin_note",
+                        "remark",
+                        "is_switch",
+                        "vb_id")) {
+                    pojo.put(field, after.get(field));
+                }
+                after = pojo;
+            } else if (transportMap.getTableName().equalsIgnoreCase("h_pay_ext")) {
+                Map<String, Object> pojo = new HashMap<>(after.size());
+                for (String field : Arrays.asList("pay_id",
+                        "product_id",
+                        "product_name",
+                        "product_desc",
+                        "product_cnt",
+                        "device_id",
+                        "mac",
+                        "ip",
+                        "brand",
+                        "model",
+                        "os",
+                        "os_version",
+                        "screen",
+                        "net",
+                        "imsi",
+                        "longitude",
+                        "latitude",
+                        "userua",
+                        "server_id",
+                        "server_name",
+                        "role_id",
+                        "role_name",
+                        "role_level",
+                        "money",
+                        "role_vip",
+                        "online_time",
+                        "scene",
+                        "axis",
+                        "last_operation",
+                        "party_name",
+                        "role_balance",
+                        "experience",
+                        "attach",
+                        "type",
+                        "rolelevel_ctime",
+                        "rolelevel_mtime")) {
+                    pojo.put(field, after.get(field));
+                }
+                after = pojo;
+            } else if (transportMap.getTableName().equalsIgnoreCase("h_user")) {
+                Map<String, Object> pojo = new HashMap<>(after.size());
+                for (String field : Arrays.asList("id",
+                        "user_login",
+                        "user_nicename",
+                        "create_time",
+                        "role_id",
+                        "parent_id",
+                        "mem_id",
+                        "account_id",
+                        "parent_account_id",
+                        "cp_id")) {
+                    pojo.put(field, after.get(field));
+                }
+                after = pojo;
+            }
+            transportMap.setAfter(after);
+            return transportMap;
+        }
+    }
+}

+ 331 - 0
src/main/java/com/qucheng/game/data/oldsystem/ods/OldGameSystemCDCBgOld.java

@@ -0,0 +1,331 @@
+package com.qucheng.game.data.oldsystem.ods;
+
+import com.qucheng.game.data.oldsystem.Env;
+import com.qucheng.game.data.oldsystem.pojo.FlinkAppConfigParam;
+import com.qucheng.game.data.oldsystem.pojo.MysqlConfigParam;
+import com.qucheng.game.data.oldsystem.pojo.TransportMap;
+import com.qucheng.game.data.oldsystem.serialization.MapDebeziumDeserializationSchema;
+import com.qucheng.game.data.oldsystem.sink.CdcMysqlTablesSink;
+import com.ververica.cdc.connectors.mysql.source.MySqlSource;
+import com.ververica.cdc.connectors.mysql.table.StartupOptions;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+public class OldGameSystemCDCBgOld {
+    private static final FlinkAppConfigParam appConfigParam = Env.isTest ? FlinkAppConfigParam.builder()
+            .savePath("")
+            .interval(300L)
+            .timeout(300L)
+            .minBetween(1L)
+            .build() : FlinkAppConfigParam.builder()
+            .savePath("hdfs://nameservice1:8020/user/limeng/cluster_yarn/checkpoints/ODS/" + OldGameSystemCDCBgOld.class.getSimpleName())
+            .interval(300L)
+            .timeout(300L)
+            .minBetween(1L)
+            .build();
+
+    public static void main(String[] args) throws Exception {
+        System.setProperty("HADOOP_USER_NAME", "flink");
+
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        // 加载配置文件到 flink的全局配置中
+        Properties props = new Properties();
+        props.load(OldGameSystemCDCBgOld.class.getResourceAsStream(Env.isTest ? "/application.test.properties" : "/application.properties"));
+        Configuration configuration = new Configuration();
+        props.stringPropertyNames().forEach(key -> {
+            String value = props.getProperty(key);
+            configuration.setString(key.trim(), StringUtils.isBlank(value) ? "" : value.trim());
+        });
+        env.getConfig().setGlobalJobParameters(configuration);
+        // 设置默认并行度
+        env.setParallelism(1);
+
+        // 任务失败后的重启策略
+        // env.setRestartStrategy(RestartStrategies.noRestart());// 失败不重启
+        // env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));// 3:最大重试次数、10:重启间隔时间
+        env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.of(5, TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)));// 5分钟内最多重启 3次,重启间隔时间:10s
+
+        // checkpoint配置
+        env.enableCheckpointing(appConfigParam.getInterval() * 1000, CheckpointingMode.EXACTLY_ONCE);
+        // checkpoint执行超时时间,超时则 checkpoint失败
+        env.getCheckpointConfig().setCheckpointTimeout(appConfigParam.getTimeout() * 1000);
+        // checkpoint执行最小间隔时间
+        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(appConfigParam.getMinBetween() * 1000);
+        // 允许并行执行 checkpoint个数,当指定了 minPauseBetweenCheckpoints时,其值无效(就是 1)
+        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+        // 开启 checkpoints的外部持久化,但是在job失败的时候不会自动清理,需要自己手工清理state。
+        // ExternalizedCheckpointCleanup用于指定当job canceled的时候外部的 checkpoint该如何清理
+        // DELETE_ON_CANCELLATION: 在job canceled的时候会自动删除外部 state,但是如果是FAILED的状态则会保留
+        // RETAIN_ON_CANCELLATION:在job canceled的时候保留外部 state
+        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+        // 设置可容忍的检查点失败数,默认值为0表示不允许容忍任何检查点失败
+        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
+        // 设置状态后端
+        env.setStateBackend(new HashMapStateBackend());
+        // 设置检查点目录
+        if (StringUtils.isNotBlank(appConfigParam.getSavePath())) {
+            env.getCheckpointConfig().setCheckpointStorage(appConfigParam.getSavePath());
+        }
+
+        MySqlSource<TransportMap> mysqlCDCSource = MySqlSource.<TransportMap>builder()
+                .hostname(props.getProperty("cdc.mysql.backup.host"))
+                .port(StringUtils.isBlank(props.getProperty("cdc.mysql.backup.port")) ? 3306 : Integer.parseInt(props.getProperty("cdc.mysql.backup.port")))
+                .username(props.getProperty("cdc.mysql.backup.username"))
+                .password(props.getProperty("cdc.mysql.backup.password"))
+                .databaseList("bg_new_sdk_db_mp,bg_old_sdk_db_mp,ods")
+                .tableList((StringUtils.join(new String[]{
+                        "bg_old_sdk_db_mp.h_game",
+                        "bg_old_sdk_db_mp.h_mem_game",
+                        "bg_old_sdk_db_mp.h_member",
+                        "bg_old_sdk_db_mp.h_mg_role",
+                        "bg_old_sdk_db_mp.h_pay",
+                        "bg_old_sdk_db_mp.h_pay_ext",
+                        "bg_old_sdk_db_mp.h_user"
+                }, ",")))
+                .deserializer(new MapDebeziumDeserializationSchema())
+                .startupOptions(StartupOptions.initial())
+                .build();
+
+        MysqlConfigParam sinkMysqlConfig = MysqlConfigParam.builder()
+                .url(props.getProperty("mysql.oldGameSystemBgOldOds.url"))
+                .username(props.getProperty("mysql.oldGameSystemBgOldOds.username"))
+                .password(props.getProperty("mysql.oldGameSystemBgOldOds.password"))
+                .build();
+
+        env.fromSource(mysqlCDCSource, WatermarkStrategy.noWatermarks(), "mysqlCDCSource")
+                .map(new FieldFilterMap())
+                .addSink(new CdcMysqlTablesSink(sinkMysqlConfig));
+
+        env.execute(OldGameSystemCDCBgOld.class.getSimpleName());
+    }
+
+    /**
+     * 字段过滤和映射
+     */
+    public static class FieldFilterMap implements MapFunction<TransportMap, TransportMap> {
+        @Override
+        public TransportMap map(TransportMap transportMap) throws Exception {
+            if (transportMap.getAfter() == null) {
+                return transportMap;
+            }
+            Map<String, Object> after = transportMap.getAfter();
+            if (transportMap.getTableName().equalsIgnoreCase("h_game")) {
+                Map<String, Object> pojo = new HashMap<>(after.size());
+                for (String field : Arrays.asList("id",
+                        "name",
+                        "en_name",
+                        "en_abbr",
+                        "app_key",
+                        "tags",
+                        "category",
+                        "classify",
+                        "icon",
+                        "cp_payback_url",
+                        "cp_id",
+                        "parent_id",
+                        "package_name",
+                        "pay_switch",
+                        "order_switch",
+                        "pay_show",
+                        "float_is_show",
+                        "status",
+                        "is_delete",
+                        "delete_time",
+                        "is_online",
+                        "is_sdk",
+                        "list_order",
+                        "rise_order",
+                        "hot_order",
+                        "like_order",
+                        "publicity",
+                        "language",
+                        "description",
+                        "image",
+                        "run_time",
+                        "create_time",
+                        "update_time",
+                        "fine_order",
+                        "promote_switch",
+                        "apple_id",
+                        "add_cp_time",
+                        "is_bt",
+                        "is_auth",
+                        "single_tag")) {
+                    pojo.put(field, after.get(field));
+                }
+                after = pojo;
+            } else if (transportMap.getTableName().equalsIgnoreCase("h_mem_game")) {
+                Map<String, Object> pojo = new HashMap<>(after.size());
+                for (String field : Arrays.asList(
+                        "id",
+                        "mem_id",
+                        "guided_agent_id",
+                        "app_id",
+                        "nickname",
+                        "create_time",
+                        "update_time",
+                        "is_default",
+                        "like",
+                        "star_cnt",
+                        "status",
+                        "sum_money")) {
+                    pojo.put(field, after.get(field));
+                }
+                after = pojo;
+            } else if (transportMap.getTableName().equalsIgnoreCase("h_member")) {
+                Map<String, Object> pojo = new HashMap<>(after.size());
+                for (String field : Arrays.asList(
+                        "id",
+                        "nickname",
+                        "from_device",
+                        "device_id",
+                        "app_id",
+                        "agent_id",
+                        "status",
+                        "create_time",
+                        "update_time",
+                        "username")) {
+                    pojo.put(field, after.get(field));
+                }
+                after = pojo;
+            } else if (transportMap.getTableName().equalsIgnoreCase("h_mg_role")) {
+                Map<String, Object> pojo = new HashMap<>(after.size());
+                for (String field : Arrays.asList(
+                        "id",
+                        "mg_mem_id",
+                        "app_id",
+                        "server_id",
+                        "server_name",
+                        "role_id",
+                        "role_name",
+                        "role_level",
+                        "role_vip",
+                        "money",
+                        "combat_num",
+                        "ext",
+                        "os",
+                        "create_time",
+                        "update_time"
+                )) {
+                    pojo.put(field, after.get(field));
+                }
+                after = pojo;
+            } else if (transportMap.getTableName().equalsIgnoreCase("h_pay")) {
+                Map<String, Object> pojo = new HashMap<>(after.size());
+                for (String field : Arrays.asList(
+                        "id",
+                        "order_id",
+                        "cp_order_id",
+                        "mem_id",
+                        "mg_mem_id",
+                        "agent_id",
+                        "app_id",
+                        "currency",
+                        "amount",
+                        "real_amount",
+                        "product_id",
+                        "product_cnt",
+                        "product_name",
+                        "coupon_amount",
+                        "ptb_amount",
+                        "gm_amount",
+                        "integral",
+                        "integral_money",
+                        "rebate_amount",
+                        "rate",
+                        "status",
+                        "cp_status",
+                        "payway",
+                        "mobile_prefix",
+                        "is_handle",
+                        "pay_time",
+                        "create_time",
+                        "update_time",
+                        "is_distribute",
+                        "notify_cnt",
+                        "last_notify_time",
+                        "ext",
+                        "mem_note",
+                        "admin_note",
+                        "remark",
+                        "is_switch",
+                        "vb_id")) {
+                    pojo.put(field, after.get(field));
+                }
+                after = pojo;
+            } else if (transportMap.getTableName().equalsIgnoreCase("h_pay_ext")) {
+                Map<String, Object> pojo = new HashMap<>(after.size());
+                for (String field : Arrays.asList("pay_id",
+                        "product_id",
+                        "product_name",
+                        "product_desc",
+                        "product_cnt",
+                        "device_id",
+                        "mac",
+                        "ip",
+                        "brand",
+                        "model",
+                        "os",
+                        "os_version",
+                        "screen",
+                        "net",
+                        "imsi",
+                        "longitude",
+                        "latitude",
+                        "userua",
+                        "server_id",
+                        "server_name",
+                        "role_id",
+                        "role_name",
+                        "role_level",
+                        "money",
+                        "role_vip",
+                        "online_time",
+                        "scene",
+                        "axis",
+                        "last_operation",
+                        "party_name",
+                        "role_balance",
+                        "experience",
+                        "attach",
+                        "type",
+                        "rolelevel_ctime",
+                        "rolelevel_mtime")) {
+                    pojo.put(field, after.get(field));
+                }
+                after = pojo;
+            } else if (transportMap.getTableName().equalsIgnoreCase("h_user")) {
+                Map<String, Object> pojo = new HashMap<>(after.size());
+                for (String field : Arrays.asList("id",
+                        "user_login",
+                        "user_nicename",
+                        "create_time",
+                        "role_id",
+                        "parent_id",
+                        "mem_id",
+                        "account_id",
+                        "parent_account_id",
+                        "cp_id")) {
+                    pojo.put(field, after.get(field));
+                }
+                after = pojo;
+            }
+            transportMap.setAfter(after);
+            return transportMap;
+        }
+    }
+}

+ 29 - 0
src/main/java/com/qucheng/game/data/oldsystem/pojo/FlinkAppConfigParam.java

@@ -0,0 +1,29 @@
+package com.qucheng.game.data.oldsystem.pojo;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@Builder
+public class FlinkAppConfigParam {
+    /**
+     * 保存路径
+     */
+    private String savePath;
+    /**
+     * checkpoint间隔时间(秒)
+     */
+    private Long interval;
+    /**
+     * checkpoint超时时间(秒)
+     */
+    private Long timeout;
+    /**
+     * checkpoint最小间隔时间(秒)
+     */
+    private Long minBetween;
+}

+ 25 - 0
src/main/java/com/qucheng/game/data/oldsystem/pojo/MysqlConfigParam.java

@@ -0,0 +1,25 @@
+package com.qucheng.game.data.oldsystem.pojo;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@Builder
+public class MysqlConfigParam implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    @Builder.Default
+    private String driverClassName = "com.mysql.cj.jdbc.Driver";
+
+    private String url;
+
+    private String username;
+
+    private String password;
+}

+ 39 - 0
src/main/java/com/qucheng/game/data/oldsystem/pojo/TransportMap.java

@@ -0,0 +1,39 @@
+package com.qucheng.game.data.oldsystem.pojo;
+
+import io.debezium.data.Envelope;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@Builder
+public class TransportMap implements Serializable {
+    private static final long serialVersionUID = 1L;
+    /**
+     * 数据库名
+     */
+    private String dbName;
+    /**
+     * 表名
+     */
+    private String tableName;
+    /**
+     * 主键
+     */
+    private List<String> primaryKeys;
+    /**
+     * 操作类型
+     */
+    private Envelope.Operation operation;
+
+    private Map<String, Object> before;
+
+    private Map<String, Object> after;
+}

+ 67 - 0
src/main/java/com/qucheng/game/data/oldsystem/serialization/MapDebeziumDeserializationSchema.java

@@ -0,0 +1,67 @@
+package com.qucheng.game.data.oldsystem.serialization;
+
+import com.qucheng.game.data.oldsystem.pojo.TransportMap;
+import com.qucheng.game.data.oldsystem.util.CDCStructUtil;
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
+import io.debezium.data.Envelope;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.util.Collector;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class MapDebeziumDeserializationSchema implements DebeziumDeserializationSchema<TransportMap> {
+
+    @Override
+    public void deserialize(SourceRecord sourceRecord, Collector<TransportMap> collector) {
+        //解析库名&表名
+        String[] topics = sourceRecord.topic().split("\\.");
+        String dbName = topics[1];
+        String tableName = topics[2];
+
+        List<String> primaryKeys = null;
+        //解析主键
+        Struct key = (Struct) sourceRecord.key();
+        if (key != null) {
+            primaryKeys = new ArrayList<>(key.schema().fields().size());
+            for (Field field : key.schema().fields()) {
+                primaryKeys.add(field.name());
+            }
+        }
+
+        // 解析值
+        Struct struct = (Struct) sourceRecord.value();
+        Struct after = struct.getStruct("after");
+        Struct before = struct.getStruct("before");
+        //获取操作类型
+        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
+
+        Map<String, Object> beforeBean = null;
+        if (before != null) {
+            beforeBean = CDCStructUtil.structToMap(before);
+        }
+        Map<String, Object> afterBean = null;
+        if (after != null) {
+            afterBean = CDCStructUtil.structToMap(after);
+        }
+        collector.collect(TransportMap.builder()
+                .dbName(dbName)
+                .tableName(tableName)
+                .primaryKeys(primaryKeys)
+                .operation(operation)
+                .before(beforeBean)
+                .after(afterBean)
+                .build()
+        );
+    }
+
+    @Override
+    public TypeInformation<TransportMap> getProducedType() {
+        return Types.POJO(TransportMap.class);
+    }
+}

+ 88 - 0
src/main/java/com/qucheng/game/data/oldsystem/sink/CdcMysqlTablesSink.java

@@ -0,0 +1,88 @@
+package com.qucheng.game.data.oldsystem.sink;
+
+import com.qucheng.game.data.oldsystem.dao.mapper.BaseMapper;
+import com.qucheng.game.data.oldsystem.pojo.MysqlConfigParam;
+import com.qucheng.game.data.oldsystem.pojo.TransportMap;
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import io.debezium.data.Envelope;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.ibatis.mapping.Environment;
+import org.apache.ibatis.session.SqlSession;
+import org.apache.ibatis.session.SqlSessionFactory;
+import org.apache.ibatis.session.SqlSessionFactoryBuilder;
+import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory;
+
+import javax.sql.DataSource;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class CdcMysqlTablesSink extends RichSinkFunction<TransportMap> {
+    private final Map<String, String> tableNameMap;
+    private final MysqlConfigParam mysqlConfigParam;
+    protected SqlSessionFactory sqlSessionFactory;
+
+    public CdcMysqlTablesSink(MysqlConfigParam mysqlConfigParam) {
+        this(null, mysqlConfigParam);
+    }
+
+    public CdcMysqlTablesSink(Map<String, String> tableNameMap, MysqlConfigParam mysqlConfigParam) {
+        this.tableNameMap = tableNameMap == null ? Collections.emptyMap() : tableNameMap;
+        this.mysqlConfigParam = mysqlConfigParam;
+    }
+
+    @Override
+    public void open(Configuration configuration) {
+        HikariConfig config = new HikariConfig();
+        config.setDriverClassName(mysqlConfigParam.getDriverClassName());
+        config.setJdbcUrl(mysqlConfigParam.getUrl());
+        config.setUsername(mysqlConfigParam.getUsername());
+        config.setPassword(mysqlConfigParam.getPassword());
+        //连接池内保留的最少连接数
+        config.setMinimumIdle(1);
+        DataSource dataSource = new HikariDataSource(config);
+
+        Environment mybatisEnv = new Environment("mysql-sink", new JdbcTransactionFactory(), dataSource);
+
+        org.apache.ibatis.session.Configuration mybatisConfig = new org.apache.ibatis.session.Configuration(mybatisEnv);
+        mybatisConfig.addMapper(BaseMapper.class);
+        sqlSessionFactory = new SqlSessionFactoryBuilder().build(mybatisConfig);
+    }
+
+    @Override
+    public void invoke(TransportMap value, Context context) {
+        if (Envelope.Operation.TRUNCATE == value.getOperation()) {
+            return;
+        }
+        if (value.getPrimaryKeys() == null || value.getPrimaryKeys().isEmpty()) {
+            return;
+        }
+        String tableName = tableNameMap.get(value.getTableName());
+        tableName = StringUtils.isBlank(tableName) ? value.getTableName() : tableName;
+        List<String> primaryKeys = value.getPrimaryKeys();
+        Map<String, Object> data = value.getAfter();
+        if (Envelope.Operation.DELETE == value.getOperation()) {
+            // 删除
+            Map<String, Object> delParamMap = primaryKeys.stream().collect(Collectors.toMap(Function.identity(), data::get));
+            try (SqlSession session = sqlSessionFactory.openSession(true)) {
+                BaseMapper mapper = session.getMapper(BaseMapper.class);
+                mapper.delById(tableName, delParamMap);
+            }
+        } else {
+            // insertOrUpdate
+            try (SqlSession session = sqlSessionFactory.openSession(true)) {
+                BaseMapper mapper = session.getMapper(BaseMapper.class);
+                mapper.saveOrUpdate(tableName, primaryKeys, data);
+            }
+        }
+    }
+
+    @Override
+    public void close() {
+    }
+}

+ 176 - 0
src/main/java/com/qucheng/game/data/oldsystem/util/CDCStructUtil.java

@@ -0,0 +1,176 @@
+package com.qucheng.game.data.oldsystem.util;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.*;
+import java.time.format.DateTimeFormatter;
+import java.util.*;
+
+public class CDCStructUtil {
+    /**
+     * mysql内 date格式直接获取是 int数字,需要用数字加上下面的日期获取真正的时间
+     */
+    private static final LocalDate BEGIN_DAY = LocalDate.of(1970, 1, 1);
+    /**
+     * mysql返回的 datetime是 1970-1-1(-28800000)到今天的毫秒数
+     */
+    private static final Long BEGIN_TIME = LocalDateTime.of(1970, 1, 1, 0, 0).toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
+
+    private static final DateTimeFormatter FORMAT_TIMESTAMP = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'");
+
+    public static <T> T structToBean(Struct struct, Class<T> clazz) throws InstantiationException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
+        Schema schema = struct.schema();
+        List<Field> fieldList = schema.fields();
+        Map<String, java.lang.reflect.Field> beanFieldMap = parseClass(clazz);
+
+        T t = clazz.newInstance();
+        for (Field field : fieldList) {
+            java.lang.reflect.Field beanField = beanFieldMap.get(lineToHump(field.name()));
+            if (beanField == null) {
+                continue;
+            }
+            Method method = methodOfSet(clazz, beanField);
+            method.invoke(t, fieldTransform(struct.get(field), beanField));
+        }
+        return t;
+    }
+
+    public static Map<String, Object> structToMap(Struct struct) {
+        Schema schema = struct.schema();
+        List<Field> fieldList = schema.fields();
+        Map<String, Object> result = new HashMap<>(fieldList.size());
+
+        for (Field field : fieldList) {
+            Object obj = struct.get(field);
+            if (obj != null) {
+                if (field.schema().type() == Schema.Type.INT64 && "io.debezium.time.Timestamp".equals(field.schema().name())) {
+                    // datetime类型
+                    obj = DateUtil.milliToLocalDateTime(ObjectUtil.objToLong(obj) + BEGIN_TIME);
+                } else if (field.schema().type() == Schema.Type.INT32 && "io.debezium.time.Date".equals(field.schema().name())) {
+                    // date类型
+                    obj = BEGIN_DAY.plusDays(ObjectUtil.objToLong(obj));
+                } else if (field.schema().type() == Schema.Type.STRING && "io.debezium.time.ZonedTimestamp".equals(field.schema().name())) {
+                    // timestamp类型
+                    // System.out.println(field.name() + ":: " + field.schema().name() + ", " + obj.getClass() + ", " + obj);
+                    obj = LocalDateTime.parse(ObjectUtil.objToString(obj), FORMAT_TIMESTAMP).plusHours(8);
+                } else if (field.schema().type() == Schema.Type.INT64 && "io.debezium.time.MicroTime".equals(field.schema().name())) {
+                    // time类型
+                    obj = LocalTime.MIN.plusNanos(ObjectUtil.objToLong(obj) * 1000);
+                } else if (field.schema().type() == Schema.Type.BYTES && obj instanceof ByteBuffer) {
+                    // blob类型
+                    obj = ((ByteBuffer) obj).array();
+                }
+            }
+            result.put(field.name(), obj);
+        }
+        return result;
+    }
+
+    private static <T> Map<String, java.lang.reflect.Field> parseClass(Class<T> clazz) {
+        List<java.lang.reflect.Field> fieldList = new ArrayList<>();
+        Class<?> tempClazz = clazz;
+        while (tempClazz != null) {
+            java.lang.reflect.Field[] fields = tempClazz.getDeclaredFields();
+            if (fields.length != 0) {
+                fieldList.addAll(Arrays.asList(fields));
+            }
+            tempClazz = tempClazz.getSuperclass();
+        }
+        if (fieldList.size() == 0) {
+            throw new RuntimeException("Failed parsed class[" + clazz.getName() + "]. No Field!");
+        }
+        Map<String, java.lang.reflect.Field> result = new HashMap<>(fieldList.size());
+        for (java.lang.reflect.Field field : fieldList) {
+            if (Modifier.isFinal(field.getModifiers()) || Modifier.isStatic(field.getModifiers())) {
+                continue;
+            }
+            result.put(field.getName(), field);
+        }
+        if (result.isEmpty()) {
+            throw new RuntimeException("Failed parsed class[" + clazz.getName() + "]. No available Field!");
+        }
+        return result;
+    }
+
+    private static Object fieldTransform(Object obj, java.lang.reflect.Field beanField) {
+        if (obj == null) {
+            return null;
+        }
+        Class<?> fieldType = beanField.getType();
+        if (obj.getClass() == fieldType) {
+            return obj;
+        }
+        if (fieldType == Integer.class) {
+            return ObjectUtil.objToInteger(obj);
+        }
+        if (fieldType == Long.class) {
+            return ObjectUtil.objToLong(obj);
+        }
+        if (fieldType == String.class) {
+            return ObjectUtil.objToString(obj);
+        }
+        if (fieldType == BigDecimal.class) {
+            return ObjectUtil.objToBigDecimal(obj);
+        }
+        if (fieldType == Boolean.class) {
+            return ObjectUtil.objToBoolean(obj);
+        }
+        if (fieldType == LocalDate.class) {
+            if (obj instanceof Integer) {
+                return BEGIN_DAY.plusDays((Integer) obj);
+            }
+            return ObjectUtil.objToLocalDate(obj);
+        }
+        if (fieldType == LocalDateTime.class) {
+            if (obj instanceof Long) {
+                // CDC给的时间戳是已经转换过时区的(此处如果用 DateUtil里面的 milliToLocalDateTime()的话,又会 +8小时
+                return Instant.ofEpochMilli((Long) obj).atZone(ZoneOffset.ofHours(0)).toLocalDateTime();
+            }
+            return ObjectUtil.objToLocalDateTime(obj);
+        }
+        throw new RuntimeException("Failed transform field! Field: " + beanField.getName() + ", Object: " + obj.getClass() + ", ObjectValue: " + obj + ", field: " + fieldType);
+    }
+
+    private static <T> Method methodOfSet(Class<T> clazz, java.lang.reflect.Field field) throws NoSuchMethodException {
+        String fieldName = field.getName();
+        String methodName = "set" + fieldName.substring(0, 1).toUpperCase() + fieldName.substring(1);
+        return clazz.getMethod(methodName, field.getType());
+    }
+
+    /**
+     * 下划线转驼峰
+     *
+     * @param str
+     * @return
+     */
+    private static String lineToHump(String str) {
+        byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
+        byte[] result = new byte[str.length()];
+        int index = 0;
+        for (int i = 0; i < bytes.length; i++) {
+            if (bytes[i] != 95) {
+                result[index++] = bytes[i];
+                continue;
+            }
+            if (i == bytes.length - 1) {
+                result[index++] = bytes[i];
+                continue;
+            }
+            byte nextChar = bytes[++i];
+            if (97 <= nextChar && nextChar <= 122) {
+                result[index++] = (byte) (nextChar - 32);
+            } else {
+                result[index++] = nextChar;
+            }
+        }
+        return new String(result, 0, index);
+    }
+}

+ 435 - 0
src/main/java/com/qucheng/game/data/oldsystem/util/DateUtil.java

@@ -0,0 +1,435 @@
+package com.qucheng.game.data.oldsystem.util;
+
+import org.apache.commons.lang3.time.DateFormatUtils;
+
+import java.lang.management.ManagementFactory;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.time.*;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.time.temporal.TemporalAccessor;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Locale;
+
+/**
+ * 时间工具类
+ *
+ * @author ruoyi
+ */
+public class DateUtil {
+    public static String YYYY_MM_DD = "yyyy-MM-dd";
+
+    public static String YYYYMMDDHHMMSS = "yyyyMMddHHmmss";
+
+    public static String YYYY_MM_DD_HH_MM_SS = "yyyy-MM-dd HH:mm:ss";
+
+
+    public static final DateTimeFormatter FORMAT_DATETIME = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+    public static final DateTimeFormatter FORMAT_FOR_KEY = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH:mm");
+    public static final DateTimeFormatter FORMAT_DATE = DateTimeFormatter.ofPattern("yyyy-MM-dd");
+    public static final DateTimeFormatter FORMAT_TIME = DateTimeFormatter.ofPattern("HH:mm:ss");
+
+    public static long localDateToSecond(LocalDate localDate) {
+
+        return localDate.atStartOfDay(ZoneOffset.ofHours(8)).toEpochSecond();
+    }
+
+    public static long localDateToMilli(LocalDate localDate) {
+        return localDate.atStartOfDay(ZoneOffset.ofHours(8)).toInstant().toEpochMilli();
+    }
+
+    public static Date localDateToDate(LocalDate localDate) {
+        return Date.from(localDate.atStartOfDay(ZoneOffset.ofHours(8)).toInstant());
+    }
+
+    public static LocalDateTime localDateToLocalDateTime(LocalDate localDate) {
+        return LocalDateTime.of(localDate, LocalTime.MIN);
+    }
+
+    public static long localDateTimeToSecond(LocalDateTime localDateTime) {
+        return localDateTime.toEpochSecond(ZoneOffset.ofHours(8));
+    }
+
+    public static long localDateTimeToMilli(LocalDateTime localDateTime) {
+        return localDateTime.toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
+    }
+
+    public static Date localDateTimeToDate(LocalDateTime localDateTime) {
+        return Date.from(localDateTime.atZone(ZoneOffset.ofHours(8)).toInstant());
+    }
+
+    public static LocalDate secondToLocalDate(long second) {
+        return Instant.ofEpochSecond(second).atZone(ZoneOffset.ofHours(8)).toLocalDate();
+    }
+
+    public static LocalDate milliToLocalDate(long milli) {
+        return Instant.ofEpochMilli(milli).atZone(ZoneOffset.ofHours(8)).toLocalDate();
+    }
+
+    public static long milliToSecond(long milli) {
+        return milli / 1000;
+    }
+
+    public static int milliToSecond2(long milli) {
+        return ((Long) (milli / 1000)).intValue();
+    }
+
+    public static LocalDateTime secondToLocalDateTime(long second) {
+        return Instant.ofEpochSecond(second).atZone(ZoneOffset.ofHours(8)).toLocalDateTime();
+    }
+
+    public static long secondToMilli(int second) {
+        return second * 1000;
+    }
+
+    public static long secondToMilli(long second) {
+        return second * 1000;
+    }
+
+    public static LocalDateTime milliToLocalDateTime(long milli) {
+        return Instant.ofEpochMilli(milli).atZone(ZoneOffset.ofHours(8)).toLocalDateTime();
+    }
+
+    public static LocalDate dateToLocalDate(Date date) {
+        return date.toInstant().atZone(ZoneOffset.ofHours(8)).toLocalDate();
+    }
+
+    public static LocalDateTime dateToLocalDateTime(Date date) {
+        return date.toInstant().atZone(ZoneOffset.ofHours(8)).toLocalDateTime();
+    }
+
+    public static String format(TemporalAccessor temporal, DateTimeFormatter formatter) {
+        return formatter.format(temporal);
+    }
+
+    public static String formatLocalDate(LocalDate localDate) {
+        return FORMAT_DATE.format(localDate);
+    }
+
+    public static String formatLocalDateTime(LocalDateTime localDateTime) {
+        return FORMAT_DATETIME.format(localDateTime);
+    }
+
+    public static LocalDate parseLocalDate(String dateStr) {
+        return LocalDate.parse(dateStr, FORMAT_DATE);
+    }
+
+    public static LocalDateTime parseLocalDateTime(String dateStr) {
+        return LocalDateTime.parse(dateStr, FORMAT_DATETIME);
+    }
+
+    /**
+     * 2个日期的时间间隔
+     *
+     * @param beginDate
+     * @param endDate
+     * @return
+     */
+    public static long intervalOfDays(LocalDate beginDate, LocalDate endDate) {
+        return endDate.toEpochDay() - beginDate.toEpochDay();
+    }
+
+    /**
+     * 2个日期的时间间隔
+     *
+     * @param beginDate
+     * @param endDate
+     * @return
+     */
+    public static long intervalOfHour(LocalDateTime beginDate, LocalDateTime endDate) {
+        return ChronoUnit.HOURS.between(beginDate, endDate);
+    }
+
+    /**
+     * 2个日期的时间间隔
+     *
+     * @param beginDate
+     * @param endDate
+     * @return
+     */
+    public static long intervalOfMinute(LocalDateTime beginDate, LocalDateTime endDate) {
+        return ChronoUnit.MINUTES.between(beginDate, endDate);
+    }
+
+    public static List<LocalDate> splitByWeek(LocalDate startLocalDate, LocalDate endLocalDate) {
+        if (startLocalDate.compareTo(endLocalDate) > 0) {
+            throw new RuntimeException("startLocalDate must be less than endLocalDate");
+        }
+        List<LocalDate> localDates = new ArrayList<>();
+        LocalDate lastMonday = startLocalDate.with(DayOfWeek.MONDAY).plusDays(7);// 下周一
+        do {
+            localDates.add(startLocalDate);
+            startLocalDate = lastMonday;
+            lastMonday = lastMonday.plusDays(7);
+        } while (startLocalDate.compareTo(endLocalDate) < 0);
+        localDates.add(endLocalDate);
+        return localDates;
+    }
+
+    /**
+     * 将时间段按照月份切割
+     * <p>
+     * ex:2020-01-30  ~  2020-01-30   =>   ["2020-01-30","2020-01-30"]
+     * ex:2020-01-30  ~  2020-02-01   =>   ["2020-01-30","2020-02-01"]
+     * ex:2020-01-30  ~  2020-03-03   =>   ["2020-01-30","2020-02-01","2020-03-01","2020-03-03"]
+     *
+     * @param startLocalDate
+     * @param endLocalDate
+     * @return
+     */
+    public static List<LocalDate> splitByMonth(LocalDate startLocalDate, LocalDate endLocalDate) {
+        if (startLocalDate.compareTo(endLocalDate) > 0) {
+            throw new RuntimeException("startLocalDate must be less than endLocalDate");
+        }
+        List<LocalDate> localDates = new ArrayList<>();
+        LocalDate lastMonth = startLocalDate.withDayOfMonth(1).plusMonths(1);// 下月 1号
+        do {
+            localDates.add(startLocalDate);
+            startLocalDate = lastMonth;
+            lastMonth = startLocalDate.plusMonths(1);
+        } while (startLocalDate.compareTo(endLocalDate) < 0);
+        localDates.add(endLocalDate);
+        return localDates;
+    }
+
+    /**
+     * 将时间段按天切割
+     *
+     * @param startLocalDate
+     * @param endLocalDate
+     * @return
+     */
+    public static List<LocalDate> splitByDay(LocalDate startLocalDate, LocalDate endLocalDate) {
+        if (startLocalDate.compareTo(endLocalDate) > 0) {
+            throw new RuntimeException("startLocalDate must be less than endLocalDate");
+        }
+        List<LocalDate> localDates = new ArrayList<>();
+        long day = endLocalDate.toEpochDay() - startLocalDate.toEpochDay();
+        for (int i = 0; i <= day; i++) {
+            localDates.add(startLocalDate.plusDays(i));
+        }
+        return localDates;
+    }
+
+    /**
+     * 获取指定年月的天数
+     *
+     * @param year
+     * @param month
+     * @return
+     */
+    public static long daysOfYearMonth(int year, int month) {
+        LocalDate start = LocalDate.of(year, month, 1);
+        if (month == 12) {
+            year += 1;
+            month = 1;
+        } else {
+            month += 1;
+        }
+        LocalDate end = LocalDate.of(year, month, 1);
+        return intervalOfDays(start, end);
+    }
+
+    public String localDateFormat(LocalDate localDate, String formatStr) {
+        return localDate.format(DateTimeFormatter.ofPattern(formatStr));
+    }
+
+    public String localDateFormat(LocalDate localDate, DateTimeFormatter formatter) {
+        return localDate.format(formatter);
+    }
+
+    /**
+     * 获取当前日期, 默认格式为yyyy-MM-dd
+     *
+     * @return String
+     */
+    public static String getDate() {
+        return dateTimeNow(YYYY_MM_DD);
+    }
+
+    public static final String getTime() {
+        return dateTimeNow(YYYY_MM_DD_HH_MM_SS);
+    }
+
+    public static final String dateTimeNow() {
+        return dateTimeNow(YYYYMMDDHHMMSS);
+    }
+
+    public static final String dateTimeNow(final String format) {
+        return parseDateToStr(format, new Date());
+    }
+
+    public static final String dateTime(final Date date) {
+        return parseDateToStr(YYYY_MM_DD, date);
+    }
+
+    public static final String parseDateToStr(final String format, final Date date) {
+        return new SimpleDateFormat(format).format(date);
+    }
+
+    public static final Date dateTime(final String format, final String ts) {
+        try {
+            return new SimpleDateFormat(format).parse(ts);
+        } catch (ParseException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * 日期路径 即年/月/日 如2018/08/08
+     */
+    public static final String datePath() {
+        Date now = new Date();
+        return DateFormatUtils.format(now, "yyyy/MM/dd");
+    }
+
+    /**
+     * 日期路径 即年/月/日 如20180808
+     */
+    public static final String dateTime() {
+        Date now = new Date();
+        return DateFormatUtils.format(now, "yyyyMMdd");
+    }
+
+    /**
+     * 获取服务器启动时间
+     */
+    public static Date getServerStartDate() {
+        long time = ManagementFactory.getRuntimeMXBean().getStartTime();
+        return new Date(time);
+    }
+
+    /**
+     * 计算两个时间差
+     */
+    public static String getDatePoor(Date endDate, Date nowDate) {
+        long nd = 1000 * 24 * 60 * 60;
+        long nh = 1000 * 60 * 60;
+        long nm = 1000 * 60;
+        // long ns = 1000;
+        // 获得两个时间的毫秒时间差异
+        long diff = endDate.getTime() - nowDate.getTime();
+        // 计算差多少天
+        long day = diff / nd;
+        // 计算差多少小时
+        long hour = diff % nd / nh;
+        // 计算差多少分钟
+        long min = diff % nd % nh / nm;
+        // 计算差多少秒//输出结果
+        // long sec = diff % nd % nh % nm / ns;
+        return day + "天" + hour + "小时" + min + "分钟";
+    }
+
+    /**
+     * String转换成LocalDate,标准时间字符串
+     *
+     * @param date "2021-01-01"
+     * @return
+     */
+    public static LocalDateTime string2LocalDate(String date) {
+        DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+        return LocalDateTime.parse(date, fmt);
+    }
+
+    /**
+     * 特殊日期字符串处理
+     *
+     * @param dateTime “2021-01-01T00:00:00+08:00”
+     * @return
+     */
+    public static LocalDateTime string2LocalDateTime(String dateTime) throws ParseException {
+        DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
+        Date date = df.parse(dateTime);
+        SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM dd HH:mm:ss Z yyyy", Locale.UK);
+        date = sdf.parse(date.toString());
+        df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        return string2LocalDate(df.format(date));
+    }
+
+    /**
+     * 获得某天最小时间 2020-08-19 00:00:00
+     *
+     * @param oneDayTime : 某天的时间
+     * @return : 返回某天零点时间13位时间戳
+     */
+    public static long getStartOfDay(Long oneDayTime) {
+        LocalDateTime localDateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(oneDayTime), ZoneId.systemDefault());
+        LocalDateTime startOfDay = localDateTime.with(LocalTime.MIN);
+        return Date.from(startOfDay.atZone(ZoneId.systemDefault()).toInstant()).getTime();
+    }
+
+    /**
+     * 获得某天最大时间 2021-08-19 23:59:59
+     *
+     * @param oneDayTime : 某天的时间
+     * @return : 返回某天23:59:59的13位时间戳
+     */
+    public static long getEndOfDay(Long oneDayTime) {
+        LocalDateTime localDateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(oneDayTime), ZoneId.systemDefault());
+        LocalDateTime endOfDay = localDateTime.with(LocalTime.MAX);
+        return Date.from(endOfDay.atZone(ZoneId.systemDefault()).toInstant()).getTime();
+    }
+
+    public static boolean equals(LocalDateTime date1, LocalDateTime date2) {
+        if (date1 == null && date2 == null) {
+            return true;
+        }
+        if (date1 == null || date2 == null) {
+            return false;
+        }
+        return date1.compareTo(date2) == 0;
+    }
+
+    public static boolean equals(LocalDate date1, LocalDate date2) {
+        if (date1 == null && date2 == null) {
+            return true;
+        }
+        if (date1 == null || date2 == null) {
+            return false;
+        }
+        return date1.compareTo(date2) == 0;
+    }
+
+    //两个时间的小时差
+    public static long hourDiff(String date1, String date2) throws ParseException {
+        DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        Date d1 = df.parse(date1);
+        Date d2 = df.parse(date2);
+
+        long diff = d1.getTime() - d2.getTime();//这样得到的差值是微秒级别
+        long days = diff / (1000 * 60 * 60 * 24);
+        long hours = (diff - days * (1000 * 60 * 60 * 24)) / (1000 * 60 * 60);
+        return hours;
+    }
+
+    public static long differentDays(String date1, String date2) throws ParseException {
+
+        DateFormat df = new SimpleDateFormat("yyyy-MM-dd");
+        Date d1 = df.parse(date1);
+        Date d2 = df.parse(date2);
+
+        long diff = d1.getTime() - d2.getTime();//这样得到的差值是微秒级别
+        long days = diff / (1000 * 60 * 60 * 24);
+        return days;
+    }
+
+    /**
+     * 时间转换成时间戳,参数和返回值都是字符串
+     *
+     * @param s
+     * @return res
+     * @throws ParseException
+     */
+    public static Long dateToStamp(String s) throws ParseException {
+        String res;
+        //设置时间模版
+        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        Date date = simpleDateFormat.parse(s);
+        long ts = date.getTime();
+        return ts;
+    }
+}

+ 171 - 0
src/main/java/com/qucheng/game/data/oldsystem/util/JsonUtil.java

@@ -0,0 +1,171 @@
+package com.qucheng.game.data.oldsystem.util;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateDeserializer;
+import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
+import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateSerializer;
+import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * @Author wcc
+ * @Date 2020/11/17 20:37
+ * @Version 1.0
+ * @Description
+ */
+@Slf4j
+public class JsonUtil {
+
+    /**
+     * 序列化和反序列化不带 @class属性
+     */
+    public static final ObjectMapper JACKSON = new ObjectMapper()
+            // json字符串转对象多余属性不报错
+            .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+            // 对象转 json为 null不显示
+            .setSerializationInclusion(JsonInclude.Include.NON_NULL)
+            // 指定要序列化的域,只序列化字段(包括 private字段),不对 get、set及 isXxx进行序列化。ANY指所有作用域的字段,包括 private
+            .setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY)
+            .setVisibility(PropertyAccessor.GETTER, JsonAutoDetect.Visibility.NONE)
+            .setVisibility(PropertyAccessor.SETTER, JsonAutoDetect.Visibility.NONE)
+            .setVisibility(PropertyAccessor.IS_GETTER, JsonAutoDetect.Visibility.NONE)
+            .setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
+
+    /**
+     * 序列化和反序列化带 @class属性
+     */
+    public static final ObjectMapper JACKSON_WITH_CLASS = new ObjectMapper()
+            // json字符串转对象多余属性不报错
+            .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+            // 对象转 json为 null不显示
+            .setSerializationInclusion(JsonInclude.Include.NON_NULL)
+            // 指定要序列化的域,只序列化字段(包括 private字段),不对 get、set及 isXxx进行序列化。ANY指所有作用域的字段,包括 private
+            .setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY)
+            .setVisibility(PropertyAccessor.GETTER, JsonAutoDetect.Visibility.NONE)
+            .setVisibility(PropertyAccessor.SETTER, JsonAutoDetect.Visibility.NONE)
+            .setVisibility(PropertyAccessor.IS_GETTER, JsonAutoDetect.Visibility.NONE)
+            // 指定序列化输入的类型,类必须是非final修饰的,final修饰的类,比如String,Integer等会跑出异常
+            .activateDefaultTyping(LaissezFaireSubTypeValidator.instance,
+                    ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY)
+            .setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
+
+    static {// 解决 java.time包里面的类的序列化问题
+        JACKSON.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+        JACKSON_WITH_CLASS.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+        JavaTimeModule timeModule = new JavaTimeModule();
+        timeModule.addDeserializer(LocalDate.class,
+                new LocalDateDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
+        timeModule.addDeserializer(LocalDateTime.class,
+                new LocalDateTimeDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
+        timeModule.addSerializer(LocalDate.class,
+                new LocalDateSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
+        timeModule.addSerializer(LocalDateTime.class,
+                new LocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
+        JACKSON.registerModule(timeModule);
+        JACKSON_WITH_CLASS.registerModule(timeModule);
+    }
+
+    public static String toString(Object obj) {
+        if (obj == null) {
+            return null;
+        }
+        try {
+            return JACKSON.writeValueAsString(obj);
+        } catch (JsonProcessingException e) {
+            log.error(e.getMessage(), e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static String toStringWithClass(Object obj) {
+        if (obj == null) {
+            return null;
+        }
+        try {
+            return JACKSON_WITH_CLASS.writeValueAsString(obj);
+        } catch (JsonProcessingException e) {
+            log.error(e.getMessage(), e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static <T> T toObj(String json, Class<T> clazz) {
+        try {
+            return JACKSON.readValue(json, clazz);
+        } catch (IOException e) {
+            log.error(e.getMessage(), e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static Object toObjWithClass(String json) {
+        try {
+            return JACKSON_WITH_CLASS.readValue(json, Object.class);
+        } catch (IOException e) {
+            log.error(e.getMessage(), e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static <T> Collection<T> toList(String json, Class<? extends Collection> collectionClazz, Class<T> tClass) {
+        try {
+            return JACKSON.readValue(json, JACKSON.getTypeFactory().constructCollectionType(collectionClazz, tClass));
+        } catch (JsonProcessingException e) {
+            log.error(e.getMessage(), e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static Collection<Object> toListWithClass(String json, Class<? extends Collection> collectionClazz) {
+        try {
+            return JACKSON_WITH_CLASS.readValue(json, JACKSON_WITH_CLASS.getTypeFactory().constructCollectionType(collectionClazz, Object.class));
+        } catch (JsonProcessingException e) {
+            log.error(e.getMessage(), e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static <V> Map<String, V> toMap(String json, Class<? extends Map> mapClazz, Class<V> vClass) {
+        try {
+            return JACKSON.readValue(json, JACKSON.getTypeFactory().constructMapType(mapClazz, String.class, vClass));
+        } catch (JsonProcessingException e) {
+            log.error(e.getMessage(), e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static Map<String, Object> toMapWithClass(String json, Class<? extends Map> mapClazz) {
+        try {
+            return JACKSON_WITH_CLASS.readValue(json, JACKSON_WITH_CLASS.getTypeFactory().constructMapType(mapClazz, String.class, Object.class));
+        } catch (JsonProcessingException e) {
+            log.error(e.getMessage(), e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static <K, V> Map<K, V> toMap(String json, Class<? extends Map> mapClazz, Class<K> kClass, Class<V> vClass) {
+        try {
+            return JACKSON.readValue(json, JACKSON.getTypeFactory().constructMapType(mapClazz, kClass, vClass));
+        } catch (JsonProcessingException e) {
+            log.error(e.getMessage(), e);
+            throw new RuntimeException(e);
+        }
+    }
+}

+ 116 - 0
src/main/java/com/qucheng/game/data/oldsystem/util/MyKafkaUtil.java

@@ -0,0 +1,116 @@
+package com.qucheng.game.data.oldsystem.util;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
+import org.apache.flink.connector.kafka.sink.KafkaSink;
+import org.apache.flink.connector.kafka.source.KafkaSource;
+import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+
+public class MyKafkaUtil {
+
+
+
+        //往kafka写入数据
+    public static KafkaSink<String> getKafkaSink(String KAFKA_BROKERS,String topic) {
+        return KafkaSink.<String>builder()
+                .setBootstrapServers(KAFKA_BROKERS)
+                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
+                        .setTopic(topic)
+                        .setValueSerializationSchema(new SimpleStringSchema())
+                        .build())
+                .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+                .build();
+    }
+
+    //读取kafka数据
+    public static KafkaSource<String> getKafkaSource(String KAFKA_BROKERS,String topic, String groupId, OffsetsInitializer offsets) {
+        /**
+         * 以下是kafka的offset设置
+         *      OffsetsInitializer.committedOffsets()
+         *      OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)
+         *      OffsetsInitializer.timestamp(1592323200L)
+         *      OffsetsInitializer.earliest()
+         *      OffsetsInitializer.latest()
+         */
+        return KafkaSource.<String>builder()
+                .setProperty("partition.discovery.interval.ms", "60000")
+//                .setProperty("enable.auto.commit","false")
+//                .setProperty("auto.commit.interval.ms","1000")
+                .setBootstrapServers(KAFKA_BROKERS)
+                .setTopics(topic)
+                .setGroupId(groupId)
+                .setValueOnlyDeserializer(new SimpleStringSchema())
+                .setStartingOffsets(offsets)
+                .build();
+    }
+
+    public static String getKafkaDDL(String KAFKA_BROKERS,String topic, String groupId,String consumption) {
+        return " with ('connector' = 'kafka', " +
+                " 'topic' = '" + topic + "'," +
+                " 'properties.bootstrap.servers' = '" + KAFKA_BROKERS + "', " +
+                " 'properties.group.id' = '" + groupId + "', " +
+                " 'format' = 'json', " +
+
+                " 'scan.startup.mode' = '" + consumption + "')";
+    }
+
+    /**
+     * Kafka-Sink DDL 语句
+     *
+     * @param topic 输出到 Kafka 的目标主题
+     * @return 拼接好的 Kafka-Sink DDL 语句
+     */
+    public static String getKafkaSinkDDL(String KAFKA_BROKERS,String topic) {
+        return " WITH ( " +
+                "  'connector' = 'kafka', " +
+                "  'topic' = '" + topic + "', " +
+                "  'properties.bootstrap.servers' = '" + KAFKA_BROKERS + "', " +
+                "  'format' = 'json' " +
+                ")";
+    }
+
+    /**
+     * UpsertKafka-Sink DDL 语句
+     *
+     * @param topic 输出到 Kafka 的目标主题
+     * @return 拼接好的 UpsertKafka-Sink DDL 语句
+     */
+    public static String getUpsertKafkaDDL(String KAFKA_BROKERS,String topic) {
+        return " WITH ( " +
+                "  'connector' = 'upsert-kafka', " +
+                "  'topic' = '" + topic + "', " +
+                "  'properties.bootstrap.servers' = '" + KAFKA_BROKERS + "', " +
+                "  'key.format' = 'json', " +
+                "  'value.format' = 'json' " +
+                ")";
+    }
+
+    /**
+     * topic_db主题的  Kafka-Source DDL 语句
+     *
+     * @param groupId 消费者组
+     * @return 拼接好的 Kafka 数据源 DDL 语句
+     */
+    public static String getTopicDb(String KAFKA_BROKERS,String topic,String groupId,String consumption) {
+        return "CREATE TABLE topic_db ( " +
+                "  `db` STRING, " +
+                "  `tableName` STRING, " +
+                "  `type` STRING, " +
+                "  `key` MAP<STRING,STRING>, " +
+                "  `after` MAP<STRING,STRING>, " +
+                "  `pt` AS PROCTIME()" +
+                ") " + getKafkaDDL(KAFKA_BROKERS,topic, groupId,consumption);
+    }
+
+    public static String getTopicDb1(String KAFKA_BROKERS,String topic,String groupId,String consumption) {
+        return "CREATE TABLE topic_db1 ( " +
+                "  `db` STRING, " +
+                "  `tableName` STRING, " +
+                "  `type` STRING, " +
+                "  `key` MAP<STRING,STRING>, " +
+                "  `after` MAP<STRING,STRING>, " +
+                "  `pt` AS PROCTIME()" +
+                ") " + getKafkaDDL(KAFKA_BROKERS,topic, groupId,consumption);
+    }
+}

+ 394 - 0
src/main/java/com/qucheng/game/data/oldsystem/util/ObjectUtil.java

@@ -0,0 +1,394 @@
+package com.qucheng.game.data.oldsystem.util;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+
+import java.math.BigDecimal;
+import java.text.SimpleDateFormat;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.Calendar;
+import java.util.Date;
+
+@Slf4j
+public class ObjectUtil {
+
+    public static <T> boolean objEquals(T t1, T t2) {
+        if (t1 == null && t2 == null) {
+            return true;
+        }
+        if (t1 == null || t2 == null) {
+            return false;
+        }
+        return t1.equals(t2);
+    }
+
+    public static Boolean objToBoolean(Object value) {
+        return objToBoolean(value, null);
+    }
+
+    public static Boolean objToBoolean(Object value, Boolean defaultValue) {
+        if (value == null) {
+            return defaultValue;
+        }
+        try {
+            if (value instanceof Boolean) {
+                return (Boolean) value;
+            } else if (value instanceof Integer) {
+                return (Integer) value > 0;
+            } else if (value instanceof Long) {
+                return (Long) value > 0;
+            } else if (value instanceof Short) {
+                return (Short) value > 0;
+            } else if (value instanceof String) {
+                if ("true".equalsIgnoreCase((String) value)) {
+                    return true;
+                }
+                if ("false".equalsIgnoreCase((String) value)) {
+                    return false;
+                }
+            }
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+        }
+        throw new RuntimeException("Unsupported data type[" + value.getClass().getName() + "(" + value + ")" + " --> Boolean]");
+    }
+
+    public static String objToString(Object value) {
+        return objToString(value, null);
+    }
+
+    public static String objToString(Object value, String defaultValue) {
+        if (value == null) {
+            return defaultValue;
+        }
+        if (value instanceof String) {
+            return (String) value;
+        }
+        if (value instanceof Double) {
+            String result = BigDecimal.valueOf((Double) value).toString();
+            if (result.endsWith(".0")) {
+                result = result.substring(0, result.length() - 2);
+            }
+            return result;
+        }
+        if (value instanceof Date) {
+            return new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format((Date) value);
+        }
+        if (value instanceof LocalDate) {
+            return DateUtil.formatLocalDate((LocalDate) value);
+        }
+        if (value instanceof LocalDateTime) {
+            return DateUtil.formatLocalDateTime((LocalDateTime) value);
+        }
+        return value.toString();
+    }
+
+    public static Double objToDouble(Object value) {
+        return objToDouble(value, null);
+    }
+
+    public static Double objToDouble(Object value, Double defaultValue) {
+        if (value == null) {
+            return defaultValue;
+        }
+        try {
+            if (value instanceof Integer) {
+                return Double.valueOf((Integer) value);
+            }
+            if (value instanceof BigDecimal) {
+                return ((BigDecimal) value).doubleValue();
+            }
+            if (value instanceof Double) {
+                return (Double) value;
+            }
+            if (value instanceof String) {
+                return Double.parseDouble((String) value);
+            }
+            if (value instanceof Long) {
+                return Double.valueOf((Long) value);
+            }
+            if (value instanceof Float) {
+                return Double.valueOf((Float) value);
+            }
+            if (value instanceof Byte) {
+                return Double.valueOf((Byte) value);
+            }
+            if (value instanceof Short) {
+                return Double.valueOf((Short) value);
+            }
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+        }
+        throw new RuntimeException("Unsupported data type[" + value.getClass().getName() + "(" + value + ")" + " --> Double]");
+    }
+
+    public static Integer objToInteger(Object value) {
+        return objToInteger(value, null);
+    }
+
+    public static Integer objToInteger(Object value, Integer defaultValue) {
+        if (value == null) {
+            return defaultValue;
+        }
+        try {
+            if (value instanceof Integer) {
+                return (Integer) value;
+            }
+            if (value instanceof BigDecimal) {
+                return ((BigDecimal) value).intValue();
+            }
+            if (value instanceof Double) {
+                return ((Double) value).intValue();
+            }
+            if (value instanceof String) {
+                return Integer.parseInt((String) value);
+            }
+            if (value instanceof Long) {
+                return ((Long) value).intValue();
+            }
+            if (value instanceof Float) {
+                return ((Float) value).intValue();
+            }
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+        }
+        throw new RuntimeException("Unsupported data type[" + value.getClass().getName() + "(" + value + ")" + " --> Integer]");
+    }
+
+    public static Long objToLong(Object value) {
+        return objToLong(value, null);
+    }
+
+    public static Long objToLongNull(Object value) {
+        return objToLong(value, 0L);
+    }
+
+    public static Long objToLong(Object value, Long defaultValue) {
+        if (value == null) {
+            return defaultValue;
+        }
+        try {
+            if (value instanceof Integer) {
+                return ((Integer) value).longValue();
+            }
+            if (value instanceof BigDecimal) {
+                return ((BigDecimal) value).longValue();
+            }
+            if (value instanceof Double) {
+                return ((Double) value).longValue();
+            }
+            if (value instanceof String) {
+                return Long.parseLong((String) value);
+            }
+            if (value instanceof Long) {
+                return (Long) value;
+            }
+            if (value instanceof Float) {
+                return ((Float) value).longValue();
+            }
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+        }
+        throw new RuntimeException("Unsupported data type[" + value.getClass().getName() + "(" + value + ")" + " --> Long]");
+    }
+
+    public static Float objToFloat(Object value) {
+        return objToFloat(value, null);
+    }
+
+    public static Float objToFloat(Object value, Float defaultValue) {
+        if (value == null) {
+            return defaultValue;
+        }
+        try {
+            if (value instanceof Float) {
+                return (Float) value;
+            }
+            if (value instanceof Integer) {
+                return ((Integer) value).floatValue();
+            }
+            if (value instanceof BigDecimal) {
+                return ((BigDecimal) value).floatValue();
+            }
+            if (value instanceof Double) {
+                return ((Double) value).floatValue();
+            }
+            if (value instanceof String) {
+                return Float.parseFloat((String) value);
+            }
+            if (value instanceof Long) {
+                return ((Long) value).floatValue();
+            }
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+        }
+        throw new RuntimeException("Unsupported data type[" + value.getClass().getName() + "(" + value + ")" + " --> Float]");
+    }
+
+    public static BigDecimal objToBigDecimal(Object value) {
+        return objToBigDecimal(value, null);
+    }
+
+    public static BigDecimal objToBigDecimal(Object value, BigDecimal defaultValue) {
+        if (value == null) {
+            return defaultValue;
+        }
+        try {
+            if (value instanceof BigDecimal) {
+                return (BigDecimal) value;
+            }
+            if (value instanceof Float) {
+                return BigDecimal.valueOf((Float) value);
+            }
+            if (value instanceof Integer) {
+                return BigDecimal.valueOf((Integer) value);
+            }
+            if (value instanceof Double) {
+                return BigDecimal.valueOf((Double) value);
+            }
+            if (value instanceof String) {
+                return new BigDecimal((String) value);
+            }
+            if (value instanceof Long) {
+                return BigDecimal.valueOf((Long) value);
+            }
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+        }
+        throw new RuntimeException("Unsupported data type[" + value.getClass().getName() + "(" + value + ")" + " --> BigDecimal]");
+    }
+
+    public static Date objToDate(Object value) {
+        return objToDate(value, null, "yyyy-MM-dd hh:mm:ss");
+    }
+
+    public static Date objToDate(Object value, String dateFormat) {
+        return objToDate(value, null, dateFormat);
+    }
+
+    public static Date objToDate(Object value, Date defaultValue) {
+        return objToDate(value, defaultValue, "yyyy-MM-dd hh:mm:ss");
+    }
+
+    public static Date objToDate(Object value, Date defaultValue, String dateFormat) {
+        if (value == null) {
+            return defaultValue;
+        }
+        try {
+            if (value instanceof Date) {
+                return (Date) value;
+            }
+            if (value instanceof Calendar) {
+                return ((Calendar) value).getTime();
+            }
+            if (value instanceof String && dateFormat != null) {
+                return new SimpleDateFormat(dateFormat).parse((String) value);
+            }
+            if (value instanceof Number) {
+                String temp = value.toString();
+                if (temp.length() == 13) {
+                    return new Date(((Number) value).longValue());
+                } else {
+                    return new Date(((Number) value).longValue() * 1000);
+                }
+            }
+            if (value instanceof LocalDateTime) {
+                return Date.from(((LocalDateTime) value).atZone(ZoneOffset.ofHours(8)).toInstant());
+            }
+            if (value instanceof LocalDate) {
+                return Date.from(((LocalDate) value).atStartOfDay(ZoneOffset.ofHours(8)).toInstant());
+            }
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+        }
+        throw new RuntimeException("Unsupported data type[" + value.getClass().getName() + "(" + value + ")" + " --> Date]");
+    }
+
+    public static LocalDate objToLocalDate(Object value) {
+        return objToLocalDate(value, null, null);
+    }
+
+    public static LocalDate objToLocalDate(Object value, LocalDate defaultValue) {
+        return objToLocalDate(value, defaultValue, null);
+    }
+
+    public static LocalDate objToLocalDate(Object value, String dateFormat) {
+        return objToLocalDate(value, null, dateFormat);
+    }
+
+    public static LocalDate objToLocalDate(Object value, LocalDate defaultValue, String dateFormat) {
+        if (value == null) {
+            return defaultValue;
+        }
+        if (value instanceof LocalDate) {
+            return (LocalDate) value;
+        }
+        if (value instanceof LocalDateTime) {
+            return ((LocalDateTime) value).toLocalDate();
+        }
+        if (value instanceof String) {
+            if (StringUtils.isBlank(dateFormat)) {
+                return DateUtil.parseLocalDate((String) value);
+            }
+            return LocalDate.parse((String) value, DateTimeFormatter.ofPattern(dateFormat));
+        }
+        if (value instanceof Number) {
+            String temp = value.toString();
+            if (temp.length() == 13) {
+                return DateUtil.milliToLocalDate(((Number) value).longValue());
+            } else {
+                return DateUtil.secondToLocalDate(((Number) value).intValue());
+            }
+        }
+        if (value instanceof Date) {
+            return ((Date) value).toInstant().atZone(ZoneOffset.ofHours(8)).toLocalDate();
+        }
+        throw new RuntimeException("Unsupported data type[" + value.getClass().getName() + "(" + value + ")" + " --> LocalDate]");
+    }
+
+    public static LocalDateTime objToLocalDateTime(Object value) {
+        return objToLocalDateTime(value, null, null);
+    }
+
+    public static LocalDateTime objToLocalDateTime(Object value, LocalDateTime defaultValue) {
+        return objToLocalDateTime(value, defaultValue, null);
+    }
+
+    public static LocalDateTime objToLocalDateTime(Object value, String dateFormat) {
+        return objToLocalDateTime(value, null, dateFormat);
+    }
+
+    public static LocalDateTime objToLocalDateTime(Object value, LocalDateTime defaultValue, String dateFormat) {
+        if (value == null) {
+            return defaultValue;
+        }
+        if (value instanceof LocalDate) {
+            return LocalDateTime.of((LocalDate) value, LocalTime.MIDNIGHT);
+        }
+        if (value instanceof LocalDateTime) {
+            return (LocalDateTime) value;
+        }
+        if (value instanceof String) {
+            if (StringUtils.isBlank(dateFormat)) {
+                return DateUtil.parseLocalDateTime((String) value);
+            }
+            return LocalDateTime.parse((String) value, DateTimeFormatter.ofPattern(dateFormat));
+        }
+        if (value instanceof Number) {
+            String temp = value.toString();
+            if (temp.length() == 13) {
+                return DateUtil.milliToLocalDateTime(((Number) value).longValue());
+            } else {
+                return DateUtil.secondToLocalDateTime(((Number) value).intValue());
+            }
+        }
+        if (value instanceof Date) {
+            return ((Date) value).toInstant().atZone(ZoneOffset.ofHours(8)).toLocalDateTime();
+        }
+        throw new RuntimeException("Unsupported data type[" + value.getClass().getName() + "(" + value + ")" + " --> LocalDateTime]");
+    }
+}

+ 16 - 0
src/main/resources/application.properties

@@ -0,0 +1,16 @@
+# backup
+cdc.mysql.backup.host=10.0.0.229
+cdc.mysql.backup.port=3306
+cdc.mysql.backup.username=root
+cdc.mysql.backup.password=Qc_1234567
+
+# BG_New_SDK
+mysql.oldGameSystemBgNewOds.url=jdbc:mysql://10.0.0.229:3306/GameDataBgNewSdk?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
+mysql.oldGameSystemBgNewOds.username=root
+mysql.oldGameSystemBgNewOds.password=Qc_1234567
+
+
+# BG_Old_SDK
+mysql.oldGameSystemBgOldOds.url=jdbc:mysql://10.0.0.229:3306/GameDataBgOldSdk?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
+mysql.oldGameSystemBgOldOds.username=root
+mysql.oldGameSystemBgOldOds.password=Qc_1234567

+ 16 - 0
src/main/resources/application.test.properties

@@ -0,0 +1,16 @@
+# backup
+cdc.mysql.backup.host=118.31.103.66
+cdc.mysql.backup.port=3306
+cdc.mysql.backup.username=root
+cdc.mysql.backup.password=Qc_1234567
+
+# BG_New_SDK
+mysql.oldGameSystemBgNewOds.url=jdbc:mysql://118.31.103.66:3306/GameDataBgNewSdk?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
+mysql.oldGameSystemBgNewOds.username=root
+mysql.oldGameSystemBgNewOds.password=Qc_1234567
+
+
+# BG_Old_SDK
+mysql.oldGameSystemBgOldOds.url=jdbc:mysql://118.31.103.66:3306/GameDataBgOldSdk?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
+mysql.oldGameSystemBgOldOds.username=root
+mysql.oldGameSystemBgOldOds.password=Qc_1234567

+ 26 - 0
src/main/resources/log4j2.properties

@@ -0,0 +1,26 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+rootLogger.level = ERROR
+rootLogger.appenderRef.console.ref = ConsoleAppender
+
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+com.qucheng.game.data.ods.dao.mapper = info