Selaa lähdekoodia

新增 mp_agent_conf表

wcc 2 vuotta sitten
vanhempi
commit
61616fae72

+ 1 - 1
src/main/java/com/qucheng/game/data/oldsystem/Env.java

@@ -1,5 +1,5 @@
 package com.qucheng.game.data.oldsystem;
 
 public class Env {
-    public static final boolean isTest = false;
+    public static final boolean isTest = true;
 }

+ 315 - 0
src/main/java/com/qucheng/game/data/oldsystem/ods/AdAccountCDC.java

@@ -0,0 +1,315 @@
+package com.qucheng.game.data.oldsystem.ods;
+
+import com.qucheng.game.data.oldsystem.Env;
+import com.qucheng.game.data.oldsystem.dao.mapper.BaseMapper;
+import com.qucheng.game.data.oldsystem.pojo.FlinkAppConfigParam;
+import com.qucheng.game.data.oldsystem.pojo.MysqlConfigParam;
+import com.qucheng.game.data.oldsystem.pojo.TransportMap;
+import com.qucheng.game.data.oldsystem.serialization.MapDebeziumDeserializationSchema;
+import com.qucheng.game.data.oldsystem.sink.MpConfigAgentSink;
+import com.qucheng.game.data.oldsystem.util.DateUtil;
+import com.qucheng.game.data.oldsystem.util.JsonUtil;
+import com.qucheng.game.data.oldsystem.util.ObjectUtil;
+import com.ververica.cdc.connectors.mysql.source.MySqlSource;
+import com.ververica.cdc.connectors.mysql.table.StartupOptions;
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import io.debezium.data.Envelope;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Collector;
+import org.apache.ibatis.mapping.Environment;
+import org.apache.ibatis.session.SqlSession;
+import org.apache.ibatis.session.SqlSessionFactory;
+import org.apache.ibatis.session.SqlSessionFactoryBuilder;
+import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory;
+
+import javax.sql.DataSource;
+import java.time.LocalDateTime;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * 广告账号、广告渠道、投手的 CDC
+ */
+public class AdAccountCDC {
+    private static final FlinkAppConfigParam appConfigParam = Env.isTest ? FlinkAppConfigParam.builder()
+            .savePath("")
+            .interval(300L)
+            .timeout(300L)
+            .minBetween(1L)
+            .build() : FlinkAppConfigParam.builder()
+            .savePath("hdfs://nameservice1:8020/user/limeng/cluster_yarn/checkpoints/ODS/" + AdAccountCDC.class.getSimpleName())
+            .interval(300L)
+            .timeout(300L)
+            .minBetween(1L)
+            .build();
+
+    public static void main(String[] args) throws Exception {
+        System.setProperty("HADOOP_USER_NAME", "flink");
+
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        // 加载配置文件到 flink的全局配置中
+        Properties props = new Properties();
+        props.load(AdAccountCDC.class.getResourceAsStream(Env.isTest ? "/application.test.properties" : "/application.properties"));
+
+        // 设置默认并行度
+        env.setParallelism(1);
+
+        // 任务失败后的重启策略
+        // env.setRestartStrategy(RestartStrategies.noRestart());// 失败不重启
+        // env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));// 3:最大重试次数、10:重启间隔时间
+        env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.of(5, TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)));// 5分钟内最多重启 3次,重启间隔时间:10s
+
+        // checkpoint配置
+        env.enableCheckpointing(appConfigParam.getInterval() * 1000, CheckpointingMode.EXACTLY_ONCE);
+        // checkpoint执行超时时间,超时则 checkpoint失败
+        env.getCheckpointConfig().setCheckpointTimeout(appConfigParam.getTimeout() * 1000);
+        // checkpoint执行最小间隔时间
+        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(appConfigParam.getMinBetween() * 1000);
+        // 允许并行执行 checkpoint个数,当指定了 minPauseBetweenCheckpoints时,其值无效(就是 1)
+        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+        // 开启 checkpoints的外部持久化,但是在job失败的时候不会自动清理,需要自己手工清理state。
+        // ExternalizedCheckpointCleanup用于指定当job canceled的时候外部的 checkpoint该如何清理
+        // DELETE_ON_CANCELLATION: 在job canceled的时候会自动删除外部 state,但是如果是FAILED的状态则会保留
+        // RETAIN_ON_CANCELLATION:在job canceled的时候保留外部 state
+        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+        // 设置可容忍的检查点失败数,默认值为0表示不允许容忍任何检查点失败
+        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
+        // 设置状态后端
+        env.setStateBackend(new HashMapStateBackend());
+        // 设置检查点目录
+        if (StringUtils.isNotBlank(appConfigParam.getSavePath())) {
+            env.getCheckpointConfig().setCheckpointStorage(appConfigParam.getSavePath());
+        }
+
+        MySqlSource<TransportMap> mysqlCDCSource = MySqlSource.<TransportMap>builder()
+                .hostname(props.getProperty("cdc.mysql.zx.host"))
+                .port(StringUtils.isBlank(props.getProperty("cdc.mysql.zx.port")) ? 3306 : Integer.parseInt(props.getProperty("cdc.mysql.zx.port")))
+                .username(props.getProperty("cdc.mysql.zx.username"))
+                .password(props.getProperty("cdc.mysql.zx.password"))
+                .databaseList("zx-advertising-oceanengine")
+                .tableList((StringUtils.join(new String[]{
+                        "zx-advertising-oceanengine.t_clue_wechat_game",
+                        "zx-advertising-oceanengine.t_ad_account",
+                }, ",")))
+                .deserializer(new MapDebeziumDeserializationSchema())
+                .startupOptions(StartupOptions.initial())
+                .build();
+
+        MysqlConfigParam bgNewSdkMysqlConfig = MysqlConfigParam.builder()
+                .url(props.getProperty("mysql.oldGameSystemBgNewOds.url"))
+                .username(props.getProperty("mysql.oldGameSystemBgNewOds.username"))
+                .password(props.getProperty("mysql.oldGameSystemBgNewOds.password"))
+                .build();
+
+        MysqlConfigParam bgOldSdkMysqlConfig = MysqlConfigParam.builder()
+                .url(props.getProperty("mysql.oldGameSystemBgOldOds.url"))
+                .username(props.getProperty("mysql.oldGameSystemBgOldOds.username"))
+                .password(props.getProperty("mysql.oldGameSystemBgOldOds.password"))
+                .build();
+
+        MysqlConfigParam odsMysqlConfig = MysqlConfigParam.builder()
+                .url(props.getProperty("mysql.ods.url"))
+                .username(props.getProperty("mysql.ods.username"))
+                .password(props.getProperty("mysql.ods.password"))
+                .build();
+
+        MysqlConfigParam qcMysqlConfig = MysqlConfigParam.builder()
+                .url(props.getProperty("mysql.quChengText.url"))
+                .username(props.getProperty("mysql.quChengText.username"))
+                .password(props.getProperty("mysql.quChengText.password"))
+                .build();
+
+        env.fromSource(mysqlCDCSource, WatermarkStrategy.noWatermarks(), "mysqlCDCSource")
+                .flatMap(new TableMap(odsMysqlConfig))
+                .addSink(new MpConfigAgentSink(bgNewSdkMysqlConfig, bgOldSdkMysqlConfig, qcMysqlConfig));
+
+        env.execute(AdAccountCDC.class.getSimpleName());
+    }
+
+    @Slf4j
+    public static class TableMap extends RichFlatMapFunction<TransportMap, Tuple3<String, String, TransportMap>> {
+        private final MysqlConfigParam odsMysqlConfig;
+        private SqlSessionFactory odsSqlSessionFactory;
+
+        public TableMap(MysqlConfigParam odsMysqlConfig) {
+            this.odsMysqlConfig = odsMysqlConfig;
+        }
+
+        @Override
+        public void open(Configuration parameters) throws Exception {
+            odsSqlSessionFactory = adsSqlSessionFactory(odsMysqlConfig);
+        }
+
+        @Override
+        public void flatMap(TransportMap transportMap, Collector<Tuple3<String, String, TransportMap>> out) throws Exception {
+            String dbName = transportMap.getDbName();
+            String tableName = transportMap.getTableName();
+            List<String> primaryKeys = transportMap.getPrimaryKeys();
+            Envelope.Operation operation = transportMap.getOperation();
+            Map<String, Object> data = transportMap.getAfter();
+            if (operation == Envelope.Operation.DELETE) {
+                data = transportMap.getBefore();
+            } else {
+                data = transportMap.getAfter();
+            }
+            if (dbName.equals("zx-advertising-oceanengine")) {
+                if (tableName.equals("t_clue_wechat_game")) {
+                    Long accountId = ObjectUtil.objToLong(data.get("account_id"));
+                    String path = ObjectUtil.objToString(data.get("path"));
+                    if (StringUtils.isBlank(path)) {
+                        return;
+                    }
+                    String agentKey = path.replace("?state=", "");
+                    try (SqlSession session = odsSqlSessionFactory.openSession(true)) {
+                        BaseMapper mapper = session.getMapper(BaseMapper.class);
+
+                        Map<String, Object> queryParam = new HashMap<>(3);
+                        queryParam.put("agent_key", agentKey);
+                        Map<String, Object> agentInfo = mapper.queryOne("t_pitcher_agent", queryParam);
+                        queryParam.clear();
+                        if (agentInfo == null) {
+                            Map<String, Object> temp = new HashMap<>();
+                            temp.put("create_time", data.get("create_time"));
+                            temp.put("user_name", data.get("user_name"));
+                            temp.put("path", data.get("path"));
+                            temp.put("account_id", data.get("account_id"));
+                            temp.put("instance_id", data.get("instance_id"));
+                            temp.put("name", data.get("name"));
+                            System.out.println(JsonUtil.toString(temp));
+                            return;
+                        }
+
+                        String sourceSystem = ObjectUtil.objToString(agentInfo.get("source_system"));
+                        if (sourceSystem.contains("ZX")) {
+                            return;
+                        }
+                        Long zxPutUserId = ObjectUtil.objToLong(agentInfo.get("pitcher_id"));
+                        Long gameId = ObjectUtil.objToLong(agentInfo.get("game_id"));
+                        Map<String, Object> putUserMap = null;
+                        if (zxPutUserId != null) {
+                            queryParam.put("source_system", sourceSystem);
+                            queryParam.put("zx_pitcher_id", zxPutUserId);
+                            putUserMap = mapper.queryOne("t_pitcher_map", queryParam);
+                            queryParam.clear();
+                        }
+                        Map<String, Object> gameInfo = null;
+                        if (gameId != null) {
+                            queryParam.put("source_system", sourceSystem);
+                            queryParam.put("id", gameId);
+                            gameInfo = mapper.queryOne("t_game", queryParam);
+                            queryParam.clear();
+                        }
+                        Map<String, Object> result = new HashMap<>(16);
+                        result.put("advertiser_conf_id", agentInfo.get("account_id"));
+                        result.put("agent_id", agentInfo.get("id"));
+                        result.put("sys_put_user_id", zxPutUserId);
+                        result.put("app_id", gameId);
+                        result.put("create_time", DateUtil.localDateTimeToSecond(ObjectUtil.objToLocalDateTime(agentInfo.get("create_time"))));
+                        result.put("update_time", DateUtil.localDateTimeToSecond(ObjectUtil.objToLocalDateTime(agentInfo.get("create_time"))));
+                        if (putUserMap != null) {
+                            result.put("bugu_put_user_id", putUserMap.get("bugu_pitcher_id"));
+                            result.put("bugu_put_user_name", putUserMap.get("bugu_pitcher_name"));
+                            result.put("sys_put_user_name", putUserMap.get("zx_pitcher_name"));
+                        }
+                        if (gameInfo != null) {
+                            result.put("app_name", gameInfo.get("game_name"));
+                        }
+
+                        out.collect(new Tuple3<>("mp_conf_agent", sourceSystem, TransportMap.builder()
+                                .tableName("mp_conf_agent")
+                                .primaryKeys(Arrays.asList("advertiser_conf_id", "agent_id"))
+                                .operation(operation)
+                                .after(result)
+                                .build()));
+                    }
+                    return;
+                }
+                if (tableName.equals("t_ad_account")) {
+                    Long accountId = ObjectUtil.objToLong(data.get("account_id"));
+                    try (SqlSession session = odsSqlSessionFactory.openSession(true)) {
+                        BaseMapper mapper = session.getMapper(BaseMapper.class);
+
+                        Map<String, Object> queryParams = new HashMap<>(6);
+                        queryParams.put("account_id", accountId);
+                        List<Map<String, Object>> agentList = mapper.queryList("t_pitcher_agent", queryParams);
+                        queryParams.clear();
+                        if (agentList == null || agentList.isEmpty()) {
+                            return;
+                        }
+                        String sourceSystem = ObjectUtil.objToString(agentList.get(0).get("source_system"));
+                        Long putUserId = ObjectUtil.objToLong(agentList.get(0).get("pitcher_id"));
+                        Long gameId = ObjectUtil.objToLong(agentList.get(0).get("game_id"));
+                        LocalDateTime createTime = ObjectUtil.objToLocalDateTime(agentList.get(0).get("create_time"));
+
+                        Map<String, Object> putUserMap = null;
+                        if (putUserId != null) {
+                            queryParams.put("source_system", sourceSystem);
+                            queryParams.put("zx_pitcher_id", putUserId);
+                            putUserMap = mapper.queryOne("t_pitcher_map", queryParams);
+                            queryParams.clear();
+                        }
+                        Map<String, Object> gameInfo = null;
+                        if (gameId != null) {
+                            queryParams.put("source_system", sourceSystem);
+                            queryParams.put("id", gameId);
+                            gameInfo = mapper.queryOne("t_game", queryParams);
+                            queryParams.clear();
+                        }
+                        Map<String, Object> result = new HashMap<>();
+                        result.put("account_id", accountId);
+                        result.put("type", "BYTE");
+                        result.put("create_time", createTime);
+                        result.put("update_time", createTime);
+                        result.put("start_date", createTime.toLocalDate());
+                        result.put("end_date", null);
+                        if (putUserMap != null) {
+                            result.put("sys_put_user_name", putUserMap.get("zx_pitcher_name"));
+                        }
+                        if (gameInfo != null) {
+                            result.put("app_name", gameInfo.get("game_name"));
+                        }
+                        out.collect(new Tuple3<>("ad_account", sourceSystem, TransportMap.builder()
+                                .tableName("ad_account")
+                                .primaryKeys(Collections.singletonList("account_id"))
+                                .operation(operation)
+                                .after(result)
+                                .build()));
+                    }
+                    return;
+                }
+            }
+            System.out.println("未知的数据--->" + JsonUtil.toString(transportMap));
+        }
+
+        private SqlSessionFactory adsSqlSessionFactory(MysqlConfigParam mysqlConfig) {
+            // 初始化连接池
+            HikariConfig config = new HikariConfig();
+            config.setDriverClassName(mysqlConfig.getDriverClassName());
+            config.setJdbcUrl(mysqlConfig.getUrl());
+            config.setUsername(mysqlConfig.getUsername());
+            config.setPassword(mysqlConfig.getPassword());
+            //连接池内保留的最少连接数
+            config.setMinimumIdle(1);
+            DataSource dataSource = new HikariDataSource(config);
+
+            Environment mybatisEnv = new Environment("ads-mysql", new JdbcTransactionFactory(), dataSource);
+
+            org.apache.ibatis.session.Configuration mybatisConfig = new org.apache.ibatis.session.Configuration(mybatisEnv);
+            mybatisConfig.addMapper(BaseMapper.class);
+            return new SqlSessionFactoryBuilder().build(mybatisConfig);
+        }
+    }
+}

+ 97 - 0
src/main/java/com/qucheng/game/data/oldsystem/sink/MpConfigAgentSink.java

@@ -0,0 +1,97 @@
+package com.qucheng.game.data.oldsystem.sink;
+
+import com.qucheng.game.data.oldsystem.dao.mapper.BaseMapper;
+import com.qucheng.game.data.oldsystem.pojo.MysqlConfigParam;
+import com.qucheng.game.data.oldsystem.pojo.TransportMap;
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.ibatis.mapping.Environment;
+import org.apache.ibatis.session.SqlSession;
+import org.apache.ibatis.session.SqlSessionFactory;
+import org.apache.ibatis.session.SqlSessionFactoryBuilder;
+import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory;
+
+import javax.sql.DataSource;
+
+public class MpConfigAgentSink extends RichSinkFunction<Tuple3<String, String, TransportMap>> {
+    private final MysqlConfigParam bgNewMysqlConfigParam;
+    private final MysqlConfigParam bgOldMysqlConfigParam;
+    private final MysqlConfigParam qcMysqlConfigParam;
+    protected SqlSessionFactory bgNewSqlSessionFactory;
+    protected SqlSessionFactory bgOldSqlSessionFactory;
+    protected SqlSessionFactory qcSqlSessionFactory;
+
+    public MpConfigAgentSink(MysqlConfigParam bgNewMysqlConfigParam, MysqlConfigParam bgOldMysqlConfigParam, MysqlConfigParam qcMysqlConfigParam) {
+        this.bgNewMysqlConfigParam = bgNewMysqlConfigParam;
+        this.bgOldMysqlConfigParam = bgOldMysqlConfigParam;
+        this.qcMysqlConfigParam = qcMysqlConfigParam;
+    }
+
+    @Override
+    public void open(Configuration configuration) {
+        bgNewSqlSessionFactory = sqlSessionFactory(bgNewMysqlConfigParam);
+        bgOldSqlSessionFactory = sqlSessionFactory(bgOldMysqlConfigParam);
+        qcSqlSessionFactory = sqlSessionFactory(qcMysqlConfigParam);
+    }
+
+    @Override
+    public void invoke(Tuple3<String, String, TransportMap> tuple3, Context context) {
+        switch (tuple3.f0) {
+            case "mp_conf_agent":
+                invokeMpConfigAgent(tuple3.f1, tuple3.f2);
+                break;
+            case "ad_account":
+                invokeAdAccount(tuple3.f1, tuple3.f2);
+                break;
+            default:
+                break;
+        }
+    }
+
+    private void invokeMpConfigAgent(String sdkSource, TransportMap value) {
+        if (sdkSource.equals("BG_NEW")) {
+            try (SqlSession session = bgNewSqlSessionFactory.openSession(true)) {
+                BaseMapper mapper = session.getMapper(BaseMapper.class);
+                mapper.saveOrUpdate(value.getTableName(), value.getPrimaryKeys(), value.getAfter());
+            }
+        } else {
+            try (SqlSession session = bgOldSqlSessionFactory.openSession(true)) {
+                BaseMapper mapper = session.getMapper(BaseMapper.class);
+                mapper.saveOrUpdate(value.getTableName(), value.getPrimaryKeys(), value.getAfter());
+            }
+        }
+    }
+
+    private void invokeAdAccount(String sourceSystem, TransportMap value) {
+        try (SqlSession session = qcSqlSessionFactory.openSession(true)) {
+            BaseMapper mapper = session.getMapper(BaseMapper.class);
+            mapper.saveOrUpdate(value.getTableName(), value.getPrimaryKeys(), value.getAfter());
+        }
+    }
+
+    @Override
+    public void close() {
+    }
+
+    private SqlSessionFactory sqlSessionFactory(MysqlConfigParam mysqlConfig) {
+        // 初始化连接池
+        HikariConfig config = new HikariConfig();
+        config.setDriverClassName(mysqlConfig.getDriverClassName());
+        config.setJdbcUrl(mysqlConfig.getUrl());
+        config.setUsername(mysqlConfig.getUsername());
+        config.setPassword(mysqlConfig.getPassword());
+        //连接池内保留的最少连接数
+        config.setMinimumIdle(1);
+        DataSource dataSource = new HikariDataSource(config);
+
+        Environment mybatisEnv = new Environment("ads-mysql", new JdbcTransactionFactory(), dataSource);
+
+        org.apache.ibatis.session.Configuration mybatisConfig = new org.apache.ibatis.session.Configuration(mybatisEnv);
+        mybatisConfig.addMapper(BaseMapper.class);
+        return new SqlSessionFactoryBuilder().build(mybatisConfig);
+    }
+}

+ 13 - 1
src/main/resources/application.test.properties

@@ -16,7 +16,19 @@ mysql.oldGameSystemBgOldOds.username=root
 mysql.oldGameSystemBgOldOds.password=Qc_1234567
 
 
+# game-ods
+mysql.ods.url=jdbc:mysql://118.31.103.66:3306/ods?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
+mysql.ods.username=root
+mysql.ods.password=Qc_1234567
+
+
 # quChengText
 mysql.quChengText.url=jdbc:mysql://qc-game-cluster.rwlb.rds.aliyuncs.com:3306/quchen_text?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
 mysql.quChengText.username=qc
-mysql.quChengText.password=Qc_1234567
+mysql.quChengText.password=Qc_1234567
+
+# cdc-zx
+cdc.mysql.zx.host=rm-bp145mi6r24ik50z5xo.mysql.rds.aliyuncs.com
+cdc.mysql.zx.port=3306
+cdc.mysql.zx.username=super_data
+cdc.mysql.zx.password=data@2023ff