|
@@ -0,0 +1,301 @@
|
|
|
+package com.qucheng.game.data.oldsystem.ods;
|
|
|
+
|
|
|
+import com.qucheng.game.data.oldsystem.Env;
|
|
|
+import com.qucheng.game.data.oldsystem.dao.mapper.BaseMapper;
|
|
|
+import com.qucheng.game.data.oldsystem.pojo.FlinkAppConfigParam;
|
|
|
+import com.qucheng.game.data.oldsystem.pojo.MysqlConfigParam;
|
|
|
+import com.qucheng.game.data.oldsystem.pojo.TransportMap;
|
|
|
+import com.qucheng.game.data.oldsystem.serialization.MapDebeziumDeserializationSchema;
|
|
|
+import com.qucheng.game.data.oldsystem.sink.MpConfigAgentSink;
|
|
|
+import com.qucheng.game.data.oldsystem.util.DateUtil;
|
|
|
+import com.qucheng.game.data.oldsystem.util.JsonUtil;
|
|
|
+import com.qucheng.game.data.oldsystem.util.ObjectUtil;
|
|
|
+import com.ververica.cdc.connectors.mysql.source.MySqlSource;
|
|
|
+import com.ververica.cdc.connectors.mysql.table.StartupOptions;
|
|
|
+import com.zaxxer.hikari.HikariConfig;
|
|
|
+import com.zaxxer.hikari.HikariDataSource;
|
|
|
+import io.debezium.data.Envelope;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
|
|
|
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
|
|
|
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
|
|
|
+import org.apache.flink.api.common.time.Time;
|
|
|
+import org.apache.flink.api.java.tuple.Tuple3;
|
|
|
+import org.apache.flink.configuration.Configuration;
|
|
|
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
|
|
|
+import org.apache.flink.streaming.api.CheckpointingMode;
|
|
|
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
|
|
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
|
|
|
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
|
|
+import org.apache.flink.util.Collector;
|
|
|
+import org.apache.ibatis.mapping.Environment;
|
|
|
+import org.apache.ibatis.session.SqlSession;
|
|
|
+import org.apache.ibatis.session.SqlSessionFactory;
|
|
|
+import org.apache.ibatis.session.SqlSessionFactoryBuilder;
|
|
|
+import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory;
|
|
|
+
|
|
|
+import javax.sql.DataSource;
|
|
|
+import java.time.LocalDateTime;
|
|
|
+import java.util.*;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+
|
|
|
+/**
|
|
|
+ * 广告账号、广告渠道、投手的 CDC
|
|
|
+ */
|
|
|
+public class PitcherAgentCDC {
|
|
|
+ private static final FlinkAppConfigParam appConfigParam = Env.isTest ? FlinkAppConfigParam.builder()
|
|
|
+ .savePath("")
|
|
|
+ .interval(300L)
|
|
|
+ .timeout(300L)
|
|
|
+ .minBetween(1L)
|
|
|
+ .build() : FlinkAppConfigParam.builder()
|
|
|
+ .savePath("hdfs://nameservice1:8020/user/limeng/cluster_yarn/checkpoints/ODS/" + PitcherAgentCDC.class.getSimpleName())
|
|
|
+ .interval(300L)
|
|
|
+ .timeout(300L)
|
|
|
+ .minBetween(1L)
|
|
|
+ .build();
|
|
|
+
|
|
|
+ public static void main(String[] args) throws Exception {
|
|
|
+ System.setProperty("HADOOP_USER_NAME", "flink");
|
|
|
+
|
|
|
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
|
|
+ // 加载配置文件到 flink的全局配置中
|
|
|
+ Properties props = new Properties();
|
|
|
+ props.load(PitcherAgentCDC.class.getResourceAsStream(Env.isTest ? "/application.test.properties" : "/application.properties"));
|
|
|
+
|
|
|
+ // 设置默认并行度
|
|
|
+ env.setParallelism(1);
|
|
|
+
|
|
|
+ // 任务失败后的重启策略
|
|
|
+ // env.setRestartStrategy(RestartStrategies.noRestart());// 失败不重启
|
|
|
+ // env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));// 3:最大重试次数、10:重启间隔时间
|
|
|
+ env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.of(5, TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)));// 5分钟内最多重启 3次,重启间隔时间:10s
|
|
|
+
|
|
|
+ // checkpoint配置
|
|
|
+ env.enableCheckpointing(appConfigParam.getInterval() * 1000, CheckpointingMode.EXACTLY_ONCE);
|
|
|
+ // checkpoint执行超时时间,超时则 checkpoint失败
|
|
|
+ env.getCheckpointConfig().setCheckpointTimeout(appConfigParam.getTimeout() * 1000);
|
|
|
+ // checkpoint执行最小间隔时间
|
|
|
+ env.getCheckpointConfig().setMinPauseBetweenCheckpoints(appConfigParam.getMinBetween() * 1000);
|
|
|
+ // 允许并行执行 checkpoint个数,当指定了 minPauseBetweenCheckpoints时,其值无效(就是 1)
|
|
|
+ env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
|
|
|
+ // 开启 checkpoints的外部持久化,但是在job失败的时候不会自动清理,需要自己手工清理state。
|
|
|
+ // ExternalizedCheckpointCleanup用于指定当job canceled的时候外部的 checkpoint该如何清理
|
|
|
+ // DELETE_ON_CANCELLATION: 在job canceled的时候会自动删除外部 state,但是如果是FAILED的状态则会保留
|
|
|
+ // RETAIN_ON_CANCELLATION:在job canceled的时候保留外部 state
|
|
|
+ env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
|
|
|
+ // 设置可容忍的检查点失败数,默认值为0表示不允许容忍任何检查点失败
|
|
|
+ env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
|
|
|
+ // 设置状态后端
|
|
|
+ env.setStateBackend(new HashMapStateBackend());
|
|
|
+ // 设置检查点目录
|
|
|
+ if (StringUtils.isNotBlank(appConfigParam.getSavePath())) {
|
|
|
+ env.getCheckpointConfig().setCheckpointStorage(appConfigParam.getSavePath());
|
|
|
+ }
|
|
|
+
|
|
|
+ MySqlSource<TransportMap> mysqlCDCSource = MySqlSource.<TransportMap>builder()
|
|
|
+ .hostname(props.getProperty("cdc.mysql.backup.host"))
|
|
|
+ .port(StringUtils.isBlank(props.getProperty("cdc.mysql.backup.port")) ? 3306 : Integer.parseInt(props.getProperty("cdc.mysql.backup.port")))
|
|
|
+ .username(props.getProperty("cdc.mysql.backup.username"))
|
|
|
+ .password(props.getProperty("cdc.mysql.backup.password"))
|
|
|
+ .databaseList("ods")
|
|
|
+ .tableList((StringUtils.join(new String[]{
|
|
|
+ "ods.t_pitcher_agent",
|
|
|
+ }, ",")))
|
|
|
+ .deserializer(new MapDebeziumDeserializationSchema())
|
|
|
+ //5400 和 6400
|
|
|
+ .startupOptions(StartupOptions.initial())
|
|
|
+ .build();
|
|
|
+
|
|
|
+ MysqlConfigParam bgNewSdkMysqlConfig = MysqlConfigParam.builder()
|
|
|
+ .url(props.getProperty("mysql.oldGameSystemBgNewOds.url"))
|
|
|
+ .username(props.getProperty("mysql.oldGameSystemBgNewOds.username"))
|
|
|
+ .password(props.getProperty("mysql.oldGameSystemBgNewOds.password"))
|
|
|
+ .build();
|
|
|
+
|
|
|
+ MysqlConfigParam bgOldSdkMysqlConfig = MysqlConfigParam.builder()
|
|
|
+ .url(props.getProperty("mysql.oldGameSystemBgOldOds.url"))
|
|
|
+ .username(props.getProperty("mysql.oldGameSystemBgOldOds.username"))
|
|
|
+ .password(props.getProperty("mysql.oldGameSystemBgOldOds.password"))
|
|
|
+ .build();
|
|
|
+
|
|
|
+ MysqlConfigParam odsMysqlConfig = MysqlConfigParam.builder()
|
|
|
+ .url(props.getProperty("mysql.ods.url"))
|
|
|
+ .username(props.getProperty("mysql.ods.username"))
|
|
|
+ .password(props.getProperty("mysql.ods.password"))
|
|
|
+ .build();
|
|
|
+
|
|
|
+ MysqlConfigParam qcMysqlConfig = MysqlConfigParam.builder()
|
|
|
+ .url(props.getProperty("mysql.quChengText.url"))
|
|
|
+ .username(props.getProperty("mysql.quChengText.username"))
|
|
|
+ .password(props.getProperty("mysql.quChengText.password"))
|
|
|
+ .build();
|
|
|
+
|
|
|
+ SingleOutputStreamOperator<Tuple3<String, String, TransportMap>> out = env.fromSource(mysqlCDCSource, WatermarkStrategy.noWatermarks(), "mysqlCDCSource")
|
|
|
+ .flatMap(new TableMap(odsMysqlConfig));
|
|
|
+ out.addSink(new MpConfigAgentSink(bgNewSdkMysqlConfig, bgOldSdkMysqlConfig, qcMysqlConfig));
|
|
|
+
|
|
|
+ env.execute(PitcherAgentCDC.class.getSimpleName());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Slf4j
|
|
|
+ public static class TableMap extends RichFlatMapFunction<TransportMap, Tuple3<String, String, TransportMap>> {
|
|
|
+ private final MysqlConfigParam odsMysqlConfig;
|
|
|
+ private SqlSessionFactory odsSqlSessionFactory;
|
|
|
+
|
|
|
+ public TableMap(MysqlConfigParam odsMysqlConfig) {
|
|
|
+ this.odsMysqlConfig = odsMysqlConfig;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void open(Configuration parameters) throws Exception {
|
|
|
+ odsSqlSessionFactory = adsSqlSessionFactory(odsMysqlConfig);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void flatMap(TransportMap transportMap, Collector<Tuple3<String, String, TransportMap>> out) throws Exception {
|
|
|
+ String dbName = transportMap.getDbName();
|
|
|
+ String tableName = transportMap.getTableName();
|
|
|
+ List<String> primaryKeys = transportMap.getPrimaryKeys();
|
|
|
+ Envelope.Operation operation = transportMap.getOperation();
|
|
|
+ Map<String, Object> data = transportMap.getAfter();
|
|
|
+ if (operation == Envelope.Operation.DELETE) {
|
|
|
+ data = transportMap.getBefore();
|
|
|
+ } else {
|
|
|
+ data = transportMap.getAfter();
|
|
|
+ }
|
|
|
+ if (dbName.equals("ods")) {
|
|
|
+ if (tableName.equals("t_pitcher_agent")) {
|
|
|
+ pitcherAgentToMpConfigAgent(operation, data, out);
|
|
|
+ pitcherAgentToAdAccount(operation, data, out);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void pitcherAgentToMpConfigAgent(Envelope.Operation operation, Map<String, Object> data, Collector<Tuple3<String, String, TransportMap>> out) {
|
|
|
+ Long accountId = ObjectUtil.objToLong(data.get("account_id"));
|
|
|
+ String sourceSystem = ObjectUtil.objToString(data.get("sourceSystem"));
|
|
|
+ if (sourceSystem.contains("ZX")) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if (accountId == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ String accountType = ObjectUtil.objToString(data.get("account_type"));
|
|
|
+ Long pitcherId = ObjectUtil.objToLong(data.get("pitcher_id"));
|
|
|
+ Long gameId = ObjectUtil.objToLong(data.get("game_id"));
|
|
|
+
|
|
|
+ Map<String, Object> result = new HashMap<>(16);
|
|
|
+ result.put("advertiser_conf_id", accountId);
|
|
|
+ result.put("agent_id", data.get("id"));
|
|
|
+ result.put("sys_put_user_id", pitcherId);
|
|
|
+ result.put("app_id", gameId);
|
|
|
+ result.put("agent_name", data.get("agent_name"));
|
|
|
+ result.put("create_time", DateUtil.localDateTimeToSecond(ObjectUtil.objToLocalDateTime(data.get("create_time"))));
|
|
|
+ result.put("update_time", DateUtil.localDateTimeToSecond(ObjectUtil.objToLocalDateTime(data.get("create_time"))));
|
|
|
+
|
|
|
+ try (SqlSession session = odsSqlSessionFactory.openSession(true)) {
|
|
|
+ BaseMapper mapper = session.getMapper(BaseMapper.class);
|
|
|
+ Map<String, Object> queryParam = new HashMap<>(3);
|
|
|
+
|
|
|
+ if (pitcherId != null) {
|
|
|
+ queryParam.put("source_system", sourceSystem);
|
|
|
+ queryParam.put("zx_pitcher_id", pitcherId);
|
|
|
+ Map<String, Object> putUserMap = mapper.queryOne("t_pitcher_map", queryParam);
|
|
|
+ if (putUserMap != null) {
|
|
|
+ result.put("bugu_put_user_id", putUserMap.get("bugu_pitcher_id"));
|
|
|
+ result.put("bugu_put_user_name", putUserMap.get("bugu_pitcher_name"));
|
|
|
+ result.put("sys_put_user_name", putUserMap.get("zx_pitcher_name"));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (gameId != null) {
|
|
|
+ queryParam.put("source_system", sourceSystem);
|
|
|
+ queryParam.put("id", gameId);
|
|
|
+ queryParam.clear();
|
|
|
+ Map<String, Object> gameInfo = mapper.queryOne("t_game", queryParam);
|
|
|
+ if (gameInfo != null) {
|
|
|
+ result.put("app_name", gameInfo.get("game_name"));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ out.collect(new Tuple3<>("mp_conf_agent", sourceSystem, TransportMap.builder()
|
|
|
+ .tableName("mp_conf_agent")
|
|
|
+ .primaryKeys(Arrays.asList("advertiser_conf_id", "agent_id"))
|
|
|
+ .operation(operation)
|
|
|
+ .after(result)
|
|
|
+ .build()));
|
|
|
+ }
|
|
|
+
|
|
|
+ private void pitcherAgentToAdAccount(Envelope.Operation operation, Map<String, Object> data, Collector<Tuple3<String, String, TransportMap>> out) {
|
|
|
+ Long accountId = ObjectUtil.objToLong(data.get("account_id"));
|
|
|
+ String sourceSystem = ObjectUtil.objToString(data.get("sourceSystem"));
|
|
|
+ if (sourceSystem.contains("ZX")) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if (accountId == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ String accountType = ObjectUtil.objToString(data.get("account_type"));
|
|
|
+ Long pitcherId = ObjectUtil.objToLong(data.get("pitcher_id"));
|
|
|
+ Long gameId = ObjectUtil.objToLong(data.get("game_id"));
|
|
|
+
|
|
|
+ Map<String, Object> result = new HashMap<>();
|
|
|
+ result.put("account_id", accountId);
|
|
|
+ result.put("type", accountType);
|
|
|
+ result.put("create_time", ObjectUtil.objToLocalDateTime(data.get("create_time")));
|
|
|
+ result.put("update_time", ObjectUtil.objToLocalDateTime(data.get("create_time")));
|
|
|
+ result.put("start_date", ObjectUtil.objToLocalDateTime(data.get("create_time")).toLocalDate());
|
|
|
+ result.put("end_date", null);
|
|
|
+
|
|
|
+ try (SqlSession session = odsSqlSessionFactory.openSession(true)) {
|
|
|
+ BaseMapper mapper = session.getMapper(BaseMapper.class);
|
|
|
+
|
|
|
+ Map<String, Object> queryParams = new HashMap<>(6);
|
|
|
+ if (pitcherId != null) {
|
|
|
+ queryParams.put("source_system", sourceSystem);
|
|
|
+ queryParams.put("zx_pitcher_id", pitcherId);
|
|
|
+ Map<String, Object> putUserMap = mapper.queryOne("t_pitcher_map", queryParams);
|
|
|
+ if (putUserMap != null) {
|
|
|
+ result.put("sys_put_user_name", putUserMap.get("zx_pitcher_name"));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (gameId != null) {
|
|
|
+ queryParams.put("source_system", sourceSystem);
|
|
|
+ queryParams.put("id", gameId);
|
|
|
+ queryParams.clear();
|
|
|
+ Map<String, Object> gameInfo = mapper.queryOne("t_game", queryParams);
|
|
|
+ if (gameInfo != null) {
|
|
|
+ result.put("app_name", gameInfo.get("game_name"));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ out.collect(new Tuple3<>("ad_account", sourceSystem, TransportMap.builder()
|
|
|
+ .tableName("ad_account")
|
|
|
+ .primaryKeys(Collections.singletonList("account_id"))
|
|
|
+ .operation(operation)
|
|
|
+ .after(result)
|
|
|
+ .build()));
|
|
|
+ }
|
|
|
+
|
|
|
+ private SqlSessionFactory adsSqlSessionFactory(MysqlConfigParam mysqlConfig) {
|
|
|
+ // 初始化连接池
|
|
|
+ HikariConfig config = new HikariConfig();
|
|
|
+ config.setDriverClassName(mysqlConfig.getDriverClassName());
|
|
|
+ config.setJdbcUrl(mysqlConfig.getUrl());
|
|
|
+ config.setUsername(mysqlConfig.getUsername());
|
|
|
+ config.setPassword(mysqlConfig.getPassword());
|
|
|
+ //连接池内保留的最少连接数
|
|
|
+ config.setMinimumIdle(1);
|
|
|
+ DataSource dataSource = new HikariDataSource(config);
|
|
|
+
|
|
|
+ Environment mybatisEnv = new Environment("ads-mysql", new JdbcTransactionFactory(), dataSource);
|
|
|
+
|
|
|
+ org.apache.ibatis.session.Configuration mybatisConfig = new org.apache.ibatis.session.Configuration(mybatisEnv);
|
|
|
+ mybatisConfig.addMapper(BaseMapper.class);
|
|
|
+ return new SqlSessionFactoryBuilder().build(mybatisConfig);
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|