Browse Source

同步腾讯数据

wcc 2 years ago
parent
commit
d63f6dd2df

+ 272 - 127
src/main/java/com/qucheng/game/data/oldsystem/ods/AdAccountCDC.java

@@ -66,7 +66,7 @@ public class AdAccountCDC {
         props.load(AdAccountCDC.class.getResourceAsStream(Env.isTest ? "/application.test.properties" : "/application.properties"));
 
         // 设置默认并行度
-        env.setParallelism(1);
+        env.setParallelism(3);
 
         // 任务失败后的重启策略
         // env.setRestartStrategy(RestartStrategies.noRestart());// 失败不重启
@@ -104,6 +104,8 @@ public class AdAccountCDC {
                 .tableList((StringUtils.join(new String[]{
                         "zx-advertising-oceanengine.t_clue_wechat_game",
                         "zx-advertising-oceanengine.t_ad_account",
+                        "zx-advertising-tencent.t_ad_account",
+                        "zx-advertising-tencent.t_adcreative",
                 }, ",")))
                 .deserializer(new MapDebeziumDeserializationSchema())
                 .startupOptions(StartupOptions.initial())
@@ -156,8 +158,6 @@ public class AdAccountCDC {
 
         @Override
         public void flatMap(TransportMap transportMap, Collector<Tuple3<String, String, TransportMap>> out) throws Exception {
-
-            System.out.println(transportMap);
             String dbName = transportMap.getDbName();
             String tableName = transportMap.getTableName();
             List<String> primaryKeys = transportMap.getPrimaryKeys();
@@ -170,136 +170,281 @@ public class AdAccountCDC {
             }
             if (dbName.equals("zx-advertising-oceanengine")) {
                 if (tableName.equals("t_clue_wechat_game")) {
-                    Long accountId = ObjectUtil.objToLong(data.get("account_id"));
-                    String path = ObjectUtil.objToString(data.get("path"));
-                    if (StringUtils.isBlank(path)) {
-                        return;
-                    }
-                    String agentKey = path.replace("?state=", "");
-                    try (SqlSession session = odsSqlSessionFactory.openSession(true)) {
-                        BaseMapper mapper = session.getMapper(BaseMapper.class);
-
-                        Map<String, Object> queryParam = new HashMap<>(3);
-                        queryParam.put("agent_key", agentKey);
-                        Map<String, Object> agentInfo = mapper.queryOne("t_pitcher_agent", queryParam);
-                        queryParam.clear();
-                        if (agentInfo == null) {
-                            Map<String, Object> temp = new HashMap<>();
-                            temp.put("create_time", data.get("create_time"));
-                            temp.put("user_name", data.get("user_name"));
-                            temp.put("path", data.get("path"));
-                            temp.put("account_id", data.get("account_id"));
-                            temp.put("instance_id", data.get("instance_id"));
-                            temp.put("name", data.get("name"));
-                            System.out.println(JsonUtil.toString(temp));
-                            return;
-                        }
-
-                        String sourceSystem = ObjectUtil.objToString(agentInfo.get("source_system"));
-                        if (sourceSystem.contains("ZX")) {
-                            return;
-                        }
-                        Long zxPutUserId = ObjectUtil.objToLong(agentInfo.get("pitcher_id"));
-                        Long gameId = ObjectUtil.objToLong(agentInfo.get("game_id"));
-                        Map<String, Object> putUserMap = null;
-                        if (zxPutUserId != null) {
-                            queryParam.put("source_system", sourceSystem);
-                            queryParam.put("zx_pitcher_id", zxPutUserId);
-                            putUserMap = mapper.queryOne("t_pitcher_map", queryParam);
-                            queryParam.clear();
-                        }
-                        Map<String, Object> gameInfo = null;
-                        if (gameId != null) {
-                            queryParam.put("source_system", sourceSystem);
-                            queryParam.put("id", gameId);
-                            gameInfo = mapper.queryOne("t_game", queryParam);
-                            queryParam.clear();
-                        }
-                        Map<String, Object> result = new HashMap<>(16);
-                        result.put("advertiser_conf_id", agentInfo.get("account_id"));
-                        result.put("agent_id", agentInfo.get("id"));
-                        result.put("sys_put_user_id", zxPutUserId);
-                        result.put("app_id", gameId);
-                        result.put("agent_name", agentInfo.get("agent_name"));
-                        result.put("create_time", DateUtil.localDateTimeToSecond(ObjectUtil.objToLocalDateTime(agentInfo.get("create_time"))));
-                        result.put("update_time", DateUtil.localDateTimeToSecond(ObjectUtil.objToLocalDateTime(agentInfo.get("create_time"))));
-                        if (putUserMap != null) {
-                            result.put("bugu_put_user_id", putUserMap.get("bugu_pitcher_id"));
-                            result.put("bugu_put_user_name", putUserMap.get("bugu_pitcher_name"));
-                            result.put("sys_put_user_name", putUserMap.get("zx_pitcher_name"));
-                        }
-                        if (gameInfo != null) {
-                            result.put("app_name", gameInfo.get("game_name"));
-                        }
-
-                        out.collect(new Tuple3<>("mp_conf_agent", sourceSystem, TransportMap.builder()
-                                .tableName("mp_conf_agent")
-                                .primaryKeys(Arrays.asList("advertiser_conf_id", "agent_id"))
-                                .operation(operation)
-                                .after(result)
-                                .build()));
-                    }
+                    oceanenginePitcherAgent(operation, data, out);
+                } else if (tableName.equals("t_ad_account")) {
+                    oceanengineAccount(operation, data, out);
+                }
+            } else if ("zx-advertising-tencent".equals(dbName)) {
+                if (tableName.equals("t_adcreative")) {
+                    tencentPitcherAgent(operation, data, out);
+                } else if (tableName.equals("t_ad_account")) {
+                    tencentAccount(operation, data, out);
+                }
+            }
+        }
+
+        private void oceanenginePitcherAgent(Envelope.Operation operation, Map<String, Object> data, Collector<Tuple3<String, String, TransportMap>> out) {
+            Long accountId = ObjectUtil.objToLong(data.get("account_id"));
+            String path = ObjectUtil.objToString(data.get("path"));
+            if (StringUtils.isBlank(path)) {
+                return;
+            }
+            String agentKey = path.replace("?state=", "");
+            try (SqlSession session = odsSqlSessionFactory.openSession(true)) {
+                BaseMapper mapper = session.getMapper(BaseMapper.class);
+
+                Map<String, Object> queryParam = new HashMap<>(3);
+                queryParam.put("agent_key", agentKey);
+                Map<String, Object> agentInfo = mapper.queryOne("t_pitcher_agent", queryParam);
+                queryParam.clear();
+                if (agentInfo == null || ObjectUtil.objToLong(agentInfo.get("account_id")) == null
+                        || ObjectUtil.objToLong(agentInfo.get("account_id")) == 0) {
+                    Map<String, Object> temp = new HashMap<>();
+                    temp.put("create_time", data.get("create_time"));
+                    temp.put("user_name", data.get("user_name"));
+                    temp.put("path", data.get("path"));
+                    temp.put("account_id", data.get("account_id"));
+                    temp.put("instance_id", data.get("instance_id"));
+                    temp.put("name", data.get("name"));
+                    return;
+                }
+
+                String sourceSystem = ObjectUtil.objToString(agentInfo.get("source_system"));
+                if (sourceSystem.contains("ZX")) {
+                    return;
+                }
+                Long zxPutUserId = ObjectUtil.objToLong(agentInfo.get("pitcher_id"));
+                Long gameId = ObjectUtil.objToLong(agentInfo.get("game_id"));
+                Map<String, Object> putUserMap = null;
+                if (zxPutUserId != null) {
+                    queryParam.put("source_system", sourceSystem);
+                    queryParam.put("zx_pitcher_id", zxPutUserId);
+                    putUserMap = mapper.queryOne("t_pitcher_map", queryParam);
+                    queryParam.clear();
+                }
+                Map<String, Object> gameInfo = null;
+                if (gameId != null) {
+                    queryParam.put("source_system", sourceSystem);
+                    queryParam.put("id", gameId);
+                    gameInfo = mapper.queryOne("t_game", queryParam);
+                    queryParam.clear();
+                }
+                Map<String, Object> result = new HashMap<>(16);
+                result.put("advertiser_conf_id", agentInfo.get("account_id"));
+                result.put("agent_id", agentInfo.get("id"));
+                result.put("sys_put_user_id", zxPutUserId);
+                result.put("app_id", gameId);
+                result.put("agent_name", agentInfo.get("agent_name"));
+                result.put("create_time", DateUtil.localDateTimeToSecond(ObjectUtil.objToLocalDateTime(agentInfo.get("create_time"))));
+                result.put("update_time", DateUtil.localDateTimeToSecond(ObjectUtil.objToLocalDateTime(agentInfo.get("create_time"))));
+                if (putUserMap != null) {
+                    result.put("bugu_put_user_id", putUserMap.get("bugu_pitcher_id"));
+                    result.put("bugu_put_user_name", putUserMap.get("bugu_pitcher_name"));
+                    result.put("sys_put_user_name", putUserMap.get("zx_pitcher_name"));
+                }
+                if (gameInfo != null) {
+                    result.put("app_name", gameInfo.get("game_name"));
+                }
+
+                out.collect(new Tuple3<>("mp_conf_agent", sourceSystem, TransportMap.builder()
+                        .tableName("mp_conf_agent")
+                        .primaryKeys(Arrays.asList("advertiser_conf_id", "agent_id"))
+                        .operation(operation)
+                        .after(result)
+                        .build()));
+            }
+        }
+
+        private void oceanengineAccount(Envelope.Operation operation, Map<String, Object> data, Collector<Tuple3<String, String, TransportMap>> out) {
+            Long accountId = ObjectUtil.objToLong(data.get("account_id"));
+
+            try (SqlSession session = odsSqlSessionFactory.openSession(true)) {
+                BaseMapper mapper = session.getMapper(BaseMapper.class);
+
+                Map<String, Object> queryParams = new HashMap<>(6);
+                queryParams.put("account_id", accountId);
+                List<Map<String, Object>> agentList = mapper.queryList("t_pitcher_agent", queryParams);
+                queryParams.clear();
+                if (agentList != null) {
+                    agentList = agentList.stream().filter(map -> StringUtils.isNotBlank(ObjectUtil.objToString(map.get("agent_key")))).collect(Collectors.toList());
+                }
+                if (agentList == null || agentList.isEmpty()) {
                     return;
                 }
-                if (tableName.equals("t_ad_account")) {
-                    Long accountId = ObjectUtil.objToLong(data.get("account_id"));
-
-                    try (SqlSession session = odsSqlSessionFactory.openSession(true)) {
-                        BaseMapper mapper = session.getMapper(BaseMapper.class);
-
-                        Map<String, Object> queryParams = new HashMap<>(6);
-                        queryParams.put("account_id", accountId);
-                        List<Map<String, Object>> agentList = mapper.queryList("t_pitcher_agent", queryParams);
-                        queryParams.clear();
-                        if(agentList != null) {
-                            agentList = agentList.stream().filter(map -> StringUtils.isNotBlank(ObjectUtil.objToString(map.get("agent_key")))).collect(Collectors.toList());
-                        }
-                        if (agentList == null || agentList.isEmpty()) {
-                            return;
-                        }
-                        String sourceSystem = ObjectUtil.objToString(agentList.get(0).get("source_system"));
-                        Long putUserId = ObjectUtil.objToLong(agentList.get(0).get("pitcher_id"));
-                        Long gameId = ObjectUtil.objToLong(agentList.get(0).get("game_id"));
-                        LocalDateTime createTime = ObjectUtil.objToLocalDateTime(agentList.get(0).get("create_time"));
-
-                        Map<String, Object> putUserMap = null;
-                        if (putUserId != null) {
-                            queryParams.put("source_system", sourceSystem);
-                            queryParams.put("zx_pitcher_id", putUserId);
-                            putUserMap = mapper.queryOne("t_pitcher_map", queryParams);
-                            queryParams.clear();
-                        }
-                        Map<String, Object> gameInfo = null;
-                        if (gameId != null) {
-                            queryParams.put("source_system", sourceSystem);
-                            queryParams.put("id", gameId);
-                            gameInfo = mapper.queryOne("t_game", queryParams);
-                            queryParams.clear();
-                        }
-                        Map<String, Object> result = new HashMap<>();
-                        result.put("account_id", accountId);
-                        result.put("type", "BYTE");
-                        result.put("create_time", createTime);
-                        result.put("update_time", createTime);
-                        result.put("start_date", createTime.toLocalDate());
-                        result.put("end_date", null);
-                        if (putUserMap != null) {
-                            result.put("sys_put_user_name", putUserMap.get("zx_pitcher_name"));
-                        }
-                        if (gameInfo != null) {
-                            result.put("app_name", gameInfo.get("game_name"));
-                        }
-                        out.collect(new Tuple3<>("ad_account", sourceSystem, TransportMap.builder()
-                                .tableName("ad_account")
-                                .primaryKeys(Collections.singletonList("account_id"))
-                                .operation(operation)
-                                .after(result)
-                                .build()));
+                String sourceSystem = ObjectUtil.objToString(agentList.get(0).get("source_system"));
+                Long putUserId = ObjectUtil.objToLong(agentList.get(0).get("pitcher_id"));
+                Long gameId = ObjectUtil.objToLong(agentList.get(0).get("game_id"));
+                LocalDateTime createTime = ObjectUtil.objToLocalDateTime(agentList.get(0).get("create_time"));
+
+                Map<String, Object> putUserMap = null;
+                if (putUserId != null) {
+                    queryParams.put("source_system", sourceSystem);
+                    queryParams.put("zx_pitcher_id", putUserId);
+                    putUserMap = mapper.queryOne("t_pitcher_map", queryParams);
+                    queryParams.clear();
+                }
+                Map<String, Object> gameInfo = null;
+                if (gameId != null) {
+                    queryParams.put("source_system", sourceSystem);
+                    queryParams.put("id", gameId);
+                    gameInfo = mapper.queryOne("t_game", queryParams);
+                    queryParams.clear();
+                }
+                Map<String, Object> result = new HashMap<>();
+                result.put("account_id", accountId);
+                result.put("type", "BYTE");
+                result.put("create_time", createTime);
+                result.put("update_time", createTime);
+                result.put("start_date", createTime.toLocalDate());
+                result.put("end_date", null);
+                if (putUserMap != null) {
+                    result.put("sys_put_user_name", putUserMap.get("zx_pitcher_name"));
+                }
+                if (gameInfo != null) {
+                    result.put("app_name", gameInfo.get("game_name"));
+                }
+                out.collect(new Tuple3<>("ad_account", sourceSystem, TransportMap.builder()
+                        .tableName("ad_account")
+                        .primaryKeys(Collections.singletonList("account_id"))
+                        .operation(operation)
+                        .after(result)
+                        .build()));
+            }
+        }
+
+        private void tencentPitcherAgent(Envelope.Operation operation, Map<String, Object> data, Collector<Tuple3<String, String, TransportMap>> out) {
+            Long accountId = ObjectUtil.objToLong(data.get("account_id"));
+            String pageSpecJson = ObjectUtil.objToString(data.get("page_spec_json"));
+            String path = null;
+            if (StringUtils.isBlank(pageSpecJson)) {
+                return;
+            } else {
+                try {
+                    Map<String, Object> map = JsonUtil.toObj(pageSpecJson, Map.class);
+                    Map temp = (Map) map.get("mini_game_spec");
+                    if (temp != null) {
+                        path = (String) temp.get("mini_game_tracking_parameter");
                     }
+                } catch (Exception e) {
+                }
+            }
+            if (StringUtils.isBlank(path)) {
+                return;
+            }
+            String agentKey = path.replace("?state\\u003d", "");
+            try (SqlSession session = odsSqlSessionFactory.openSession(true)) {
+                BaseMapper mapper = session.getMapper(BaseMapper.class);
+
+                Map<String, Object> queryParam = new HashMap<>(3);
+                queryParam.put("agent_key", agentKey);
+                Map<String, Object> agentInfo = mapper.queryOne("t_pitcher_agent", queryParam);
+                queryParam.clear();
+                if (agentInfo == null || ObjectUtil.objToLong(agentInfo.get("account_id")) == null
+                        || ObjectUtil.objToLong(agentInfo.get("account_id")) == 0) {
+                    return;
+                }
+
+                String sourceSystem = ObjectUtil.objToString(agentInfo.get("source_system"));
+                if (sourceSystem.contains("ZX")) {
+                    return;
+                }
+                Long zxPutUserId = ObjectUtil.objToLong(agentInfo.get("pitcher_id"));
+                Long gameId = ObjectUtil.objToLong(agentInfo.get("game_id"));
+                Map<String, Object> putUserMap = null;
+                if (zxPutUserId != null) {
+                    queryParam.put("source_system", sourceSystem);
+                    queryParam.put("zx_pitcher_id", zxPutUserId);
+                    putUserMap = mapper.queryOne("t_pitcher_map", queryParam);
+                    queryParam.clear();
+                }
+                Map<String, Object> gameInfo = null;
+                if (gameId != null) {
+                    queryParam.put("source_system", sourceSystem);
+                    queryParam.put("id", gameId);
+                    gameInfo = mapper.queryOne("t_game", queryParam);
+                    queryParam.clear();
+                }
+                Map<String, Object> result = new HashMap<>(16);
+                result.put("advertiser_conf_id", agentInfo.get("account_id"));
+                result.put("agent_id", agentInfo.get("id"));
+                result.put("sys_put_user_id", zxPutUserId);
+                result.put("app_id", gameId);
+                result.put("agent_name", agentInfo.get("agent_name"));
+                result.put("create_time", DateUtil.localDateTimeToSecond(ObjectUtil.objToLocalDateTime(agentInfo.get("create_time"))));
+                result.put("update_time", DateUtil.localDateTimeToSecond(ObjectUtil.objToLocalDateTime(agentInfo.get("create_time"))));
+                if (putUserMap != null) {
+                    result.put("bugu_put_user_id", putUserMap.get("bugu_pitcher_id"));
+                    result.put("bugu_put_user_name", putUserMap.get("bugu_pitcher_name"));
+                    result.put("sys_put_user_name", putUserMap.get("zx_pitcher_name"));
+                }
+                if (gameInfo != null) {
+                    result.put("app_name", gameInfo.get("game_name"));
+                }
+
+                out.collect(new Tuple3<>("mp_conf_agent", sourceSystem, TransportMap.builder()
+                        .tableName("mp_conf_agent")
+                        .primaryKeys(Arrays.asList("advertiser_conf_id", "agent_id"))
+                        .operation(operation)
+                        .after(result)
+                        .build()));
+            }
+        }
+
+        private void tencentAccount(Envelope.Operation operation, Map<String, Object> data, Collector<Tuple3<String, String, TransportMap>> out) {
+            Long accountId = ObjectUtil.objToLong(data.get("account_id"));
+
+            try (SqlSession session = odsSqlSessionFactory.openSession(true)) {
+                BaseMapper mapper = session.getMapper(BaseMapper.class);
+
+                Map<String, Object> queryParams = new HashMap<>(6);
+                queryParams.put("account_id", accountId);
+                List<Map<String, Object>> agentList = mapper.queryList("t_pitcher_agent", queryParams);
+                queryParams.clear();
+                if (agentList != null) {
+                    agentList = agentList.stream().filter(map -> StringUtils.isNotBlank(ObjectUtil.objToString(map.get("agent_key")))).collect(Collectors.toList());
+                }
+                if (agentList == null || agentList.isEmpty()) {
                     return;
                 }
+                String sourceSystem = ObjectUtil.objToString(agentList.get(0).get("source_system"));
+                Long putUserId = ObjectUtil.objToLong(agentList.get(0).get("pitcher_id"));
+                Long gameId = ObjectUtil.objToLong(agentList.get(0).get("game_id"));
+                LocalDateTime createTime = ObjectUtil.objToLocalDateTime(agentList.get(0).get("create_time"));
+
+                Map<String, Object> putUserMap = null;
+                if (putUserId != null) {
+                    queryParams.put("source_system", sourceSystem);
+                    queryParams.put("zx_pitcher_id", putUserId);
+                    putUserMap = mapper.queryOne("t_pitcher_map", queryParams);
+                    queryParams.clear();
+                }
+                Map<String, Object> gameInfo = null;
+                if (gameId != null) {
+                    queryParams.put("source_system", sourceSystem);
+                    queryParams.put("id", gameId);
+                    gameInfo = mapper.queryOne("t_game", queryParams);
+                    queryParams.clear();
+                }
+                Map<String, Object> result = new HashMap<>();
+                result.put("account_id", accountId);
+                result.put("type", "BYTE");
+                result.put("create_time", createTime);
+                result.put("update_time", createTime);
+                result.put("start_date", createTime.toLocalDate());
+                result.put("end_date", null);
+                if (putUserMap != null) {
+                    result.put("sys_put_user_name", putUserMap.get("zx_pitcher_name"));
+                }
+                if (gameInfo != null) {
+                    result.put("app_name", gameInfo.get("game_name"));
+                }
+                out.collect(new Tuple3<>("ad_account", sourceSystem, TransportMap.builder()
+                        .tableName("ad_account")
+                        .primaryKeys(Collections.singletonList("account_id"))
+                        .operation(operation)
+                        .after(result)
+                        .build()));
             }
-            System.out.println("未知的数据--->" + JsonUtil.toString(transportMap));
         }
 
         private SqlSessionFactory adsSqlSessionFactory(MysqlConfigParam mysqlConfig) {

+ 0 - 165
src/main/java/com/qucheng/game/data/oldsystem/ods/ByteDailyCost.java

@@ -1,165 +0,0 @@
-package com.qucheng.game.data.oldsystem.ods;
-
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.JSONObject;
-import com.qucheng.game.data.oldsystem.Env;
-import com.qucheng.game.data.oldsystem.pojo.FlinkAppConfigParam;
-import com.qucheng.game.data.oldsystem.serialization.CustomerDeserializationSchema;
-import com.qucheng.game.data.oldsystem.sink.ads_sink;
-import com.ververica.cdc.connectors.mysql.source.MySqlSource;
-import com.ververica.cdc.connectors.mysql.table.StartupOptions;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
-import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.CheckpointConfig;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-import java.math.BigDecimal;
-import java.math.RoundingMode;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
-/**
- * ODS层数据填充
- * 过滤字段及转换字段
- */
-public class ByteDailyCost {
-    private static final FlinkAppConfigParam appConfigParam = Env.isTest ? FlinkAppConfigParam.builder()
-            .savePath("")
-            .interval(300L)
-            .timeout(300L)
-            .minBetween(1L)
-            .build() : FlinkAppConfigParam.builder()
-            .savePath("hdfs://nameservice1:8020/user/limeng/cluster_yarn/checkpoints/ODS/" + ByteDailyCost.class.getSimpleName())
-            .interval(300L)
-            .timeout(300L)
-            .minBetween(1L)
-            .build();
-
-    public static void main(String[] args) throws Exception {
-        System.setProperty("HADOOP_USER_NAME", "flink");
-
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-        // 加载配置文件到 flink的全局配置中
-        Properties props = new Properties();
-        props.load(ByteDailyCost.class.getResourceAsStream(Env.isTest ? "/application.test.properties" : "/application.properties"));
-        Configuration configuration = new Configuration();
-        props.stringPropertyNames().forEach(key -> {
-            String value = props.getProperty(key);
-            configuration.setString(key.trim(), StringUtils.isBlank(value) ? "" : value.trim());
-        });
-        env.getConfig().setGlobalJobParameters(configuration);
-        // 设置默认并行度
-        env.setParallelism(1);
-
-        // 任务失败后的重启策略
-        // env.setRestartStrategy(RestartStrategies.noRestart());// 失败不重启
-        // env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));// 3:最大重试次数、10:重启间隔时间
-        env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.of(5, TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)));// 5分钟内最多重启 3次,重启间隔时间:10s
-
-        // checkpoint配置
-        env.enableCheckpointing(appConfigParam.getInterval() * 1000, CheckpointingMode.EXACTLY_ONCE);
-        // checkpoint执行超时时间,超时则 checkpoint失败
-        env.getCheckpointConfig().setCheckpointTimeout(appConfigParam.getTimeout() * 1000);
-        // checkpoint执行最小间隔时间
-        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(appConfigParam.getMinBetween() * 1000);
-        // 允许并行执行 checkpoint个数,当指定了 minPauseBetweenCheckpoints时,其值无效(就是 1)
-        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
-        // 开启 checkpoints的外部持久化,但是在job失败的时候不会自动清理,需要自己手工清理state。
-        // ExternalizedCheckpointCleanup用于指定当job canceled的时候外部的 checkpoint该如何清理
-        // DELETE_ON_CANCELLATION: 在job canceled的时候会自动删除外部 state,但是如果是FAILED的状态则会保留
-        // RETAIN_ON_CANCELLATION:在job canceled的时候保留外部 state
-        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
-        // 设置可容忍的检查点失败数,默认值为0表示不允许容忍任何检查点失败
-        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
-        // 设置状态后端
-        env.setStateBackend(new HashMapStateBackend());
-        // 设置检查点目录
-        if (StringUtils.isNotBlank(appConfigParam.getSavePath())) {
-            env.getCheckpointConfig().setCheckpointStorage(appConfigParam.getSavePath());
-        }
-
-        MySqlSource<String> mysqlCDCSource = MySqlSource.<String>builder()
-                .hostname(props.getProperty("cdc.mysql.backup.host"))
-                .port(StringUtils.isBlank(props.getProperty("cdc.mysql.backup.port")) ? 3306 : Integer.parseInt(props.getProperty("cdc.mysql.backup.port")))
-                .username(props.getProperty("cdc.mysql.backup.username"))
-                .password(props.getProperty("cdc.mysql.backup.password"))
-                .databaseList("ods")
-                .tableList((StringUtils.join(new String[]{
-                        "ods.byte_t_ad_data_day"
-                }, ",")))
-                .deserializer(new CustomerDeserializationSchema())
-                //5400 和 6400
-                .startupOptions(StartupOptions.initial())
-                .build();
-
-        DataStreamSource<String> dataStreamSource = env.fromSource(mysqlCDCSource, WatermarkStrategy.noWatermarks(), "ODS_Mysql_Source");
-
-        dataStreamSource.map(new FieldFilterMap()).addSink(new ads_sink());
-
-        //4.启动任务
-        env.execute("ByteDailyCost");
-    }
-
-    public static class FieldFilterMap implements MapFunction<String, JSONObject> {
-        @Override
-        public JSONObject map(String value) throws Exception {
-
-            JSONObject result = new JSONObject();
-            JSONObject data = JSON.parseObject(value);
-
-            String tableName = data.getString("tableName");
-            JSONObject after = data.getJSONObject("after");
-            JSONObject key = data.getJSONObject("key");
-            String afterFinal = "";
-            String keyFinal = "";
-            String tableNameFinal = "";
-
-            if (tableName.equals("byte_t_ad_data_day")) {
-
-                List<String> columns = Arrays.asList("account_id","ad_id", "day",
-                        "cost", "show_count", "click", "ctr","active","belong_version");
-
-                after.entrySet().removeIf(next -> !columns.contains(next.getKey()));
-
-                BigDecimal cost = after.getBigDecimal("cost");
-
-//                BigDecimal divide = cost.divide(new BigDecimal(100), 6, RoundingMode.HALF_UP);
-
-                BigDecimal multiply = cost.multiply(new BigDecimal(100));
-
-                after.remove("cost");
-
-                after.put("cost",multiply);
-
-                keyFinal = key.toString().replaceAll("\"day\":", "\"date\":");
-
-                afterFinal = after.toString().replaceAll("\"day\":", "\"date\":")
-                        .replaceAll("\"show_count\":", "\"view_count\":")
-                        .replaceAll("\"click\":", "\"valid_click_count\":")
-                        .replaceAll("\"active\":", "\"from_follow_uv\":");
-
-                tableNameFinal = "daily_tt";
-            }
-
-            Object afterFinalJson = JSONObject.parse(afterFinal);
-            Object keyFinalJson = JSONObject.parse(keyFinal);
-
-            result.put("after", afterFinalJson);
-            result.put("key", keyFinalJson);
-            result.put("type", data.getString("type"));
-            result.put("tableName", tableNameFinal);
-            result.put("db", "quchen_text");
-
-            return result;
-        }
-    }
-}

+ 233 - 0
src/main/java/com/qucheng/game/data/oldsystem/ods/DailyCost.java

@@ -0,0 +1,233 @@
+package com.qucheng.game.data.oldsystem.ods;
+
+import com.qucheng.game.data.oldsystem.Env;
+import com.qucheng.game.data.oldsystem.pojo.FlinkAppConfigParam;
+import com.qucheng.game.data.oldsystem.pojo.MysqlConfigParam;
+import com.qucheng.game.data.oldsystem.pojo.TransportMap;
+import com.qucheng.game.data.oldsystem.serialization.MapDebeziumDeserializationSchema;
+import com.qucheng.game.data.oldsystem.sink.CostSink;
+import com.qucheng.game.data.oldsystem.util.ObjectUtil;
+import com.ververica.cdc.connectors.mysql.source.MySqlSource;
+import com.ververica.cdc.connectors.mysql.table.StartupOptions;
+import io.debezium.data.Envelope;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Collector;
+
+import java.math.BigDecimal;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * ODS层数据填充
+ * 过滤字段及转换字段
+ */
+public class DailyCost {
+    private static final FlinkAppConfigParam appConfigParam = Env.isTest ? FlinkAppConfigParam.builder()
+            .savePath("")
+            .interval(300L)
+            .timeout(300L)
+            .minBetween(1L)
+            .build() : FlinkAppConfigParam.builder()
+            .savePath("hdfs://nameservice1:8020/user/limeng/cluster_yarn/checkpoints/ODS/" + DailyCost.class.getSimpleName())
+            .interval(300L)
+            .timeout(300L)
+            .minBetween(1L)
+            .build();
+
+    public static void main(String[] args) throws Exception {
+        System.setProperty("HADOOP_USER_NAME", "flink");
+
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        // 加载配置文件到 flink的全局配置中
+        Properties props = new Properties();
+        props.load(DailyCost.class.getResourceAsStream(Env.isTest ? "/application.test.properties" : "/application.properties"));
+        Configuration configuration = new Configuration();
+        props.stringPropertyNames().forEach(key -> {
+            String value = props.getProperty(key);
+            configuration.setString(key.trim(), StringUtils.isBlank(value) ? "" : value.trim());
+        });
+        env.getConfig().setGlobalJobParameters(configuration);
+        // 设置默认并行度
+        env.setParallelism(3);
+
+        // 任务失败后的重启策略
+        // env.setRestartStrategy(RestartStrategies.noRestart());// 失败不重启
+        // env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));// 3:最大重试次数、10:重启间隔时间
+        env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.of(5, TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)));// 5分钟内最多重启 3次,重启间隔时间:10s
+
+        // checkpoint配置
+        env.enableCheckpointing(appConfigParam.getInterval() * 1000, CheckpointingMode.EXACTLY_ONCE);
+        // checkpoint执行超时时间,超时则 checkpoint失败
+        env.getCheckpointConfig().setCheckpointTimeout(appConfigParam.getTimeout() * 1000);
+        // checkpoint执行最小间隔时间
+        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(appConfigParam.getMinBetween() * 1000);
+        // 允许并行执行 checkpoint个数,当指定了 minPauseBetweenCheckpoints时,其值无效(就是 1)
+        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+        // 开启 checkpoints的外部持久化,但是在job失败的时候不会自动清理,需要自己手工清理state。
+        // ExternalizedCheckpointCleanup用于指定当job canceled的时候外部的 checkpoint该如何清理
+        // DELETE_ON_CANCELLATION: 在job canceled的时候会自动删除外部 state,但是如果是FAILED的状态则会保留
+        // RETAIN_ON_CANCELLATION:在job canceled的时候保留外部 state
+        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+        // 设置可容忍的检查点失败数,默认值为0表示不允许容忍任何检查点失败
+        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
+        // 设置状态后端
+        env.setStateBackend(new HashMapStateBackend());
+        // 设置检查点目录
+        if (StringUtils.isNotBlank(appConfigParam.getSavePath())) {
+            env.getCheckpointConfig().setCheckpointStorage(appConfigParam.getSavePath());
+        }
+
+        MysqlConfigParam qcMysqlConfig = MysqlConfigParam.builder()
+                .url(props.getProperty("mysql.quChengText.url"))
+                .username(props.getProperty("mysql.quChengText.username"))
+                .password(props.getProperty("mysql.quChengText.password"))
+                .build();
+
+        MySqlSource<TransportMap> mysqlCDCSource = MySqlSource.<TransportMap>builder()
+                .hostname(props.getProperty("cdc.mysql.backup.host"))
+                .port(StringUtils.isBlank(props.getProperty("cdc.mysql.backup.port")) ? 3306 : Integer.parseInt(props.getProperty("cdc.mysql.backup.port")))
+                .username(props.getProperty("cdc.mysql.backup.username"))
+                .password(props.getProperty("cdc.mysql.backup.password"))
+                .databaseList("ods")
+                .tableList((StringUtils.join(new String[]{
+                        /*"ods.byte_t_ad_data_day",
+                        "ods.t_mp_adgroups_data_day",*/
+                        "ods.t_gdt_adgroups_data_day",
+                }, ",")))
+                .deserializer(new MapDebeziumDeserializationSchema())
+                //5400 和 6400
+                .startupOptions(StartupOptions.initial())
+                .build();
+
+        env.fromSource(mysqlCDCSource, WatermarkStrategy.noWatermarks(), "ODS_Mysql_Source")
+                .flatMap(new FieldFilterMap()).addSink(new CostSink(qcMysqlConfig));
+
+        env.execute(DailyCost.class.getSimpleName());
+    }
+
+    public static class FieldFilterMap extends RichFlatMapFunction<TransportMap, TransportMap> {
+        private static final BigDecimal NUM_100 = new BigDecimal(100);
+
+        @Override
+        public void flatMap(TransportMap transportMap, Collector<TransportMap> out) throws Exception {
+            String dbName = transportMap.getDbName();
+            String tableName = transportMap.getTableName();
+            List<String> primaryKeys = transportMap.getPrimaryKeys();
+            Envelope.Operation operation = transportMap.getOperation();
+            Map<String, Object> data = transportMap.getAfter();
+            if (operation == Envelope.Operation.DELETE) {
+                data = transportMap.getBefore();
+            } else {
+                data = transportMap.getAfter();
+            }
+            if (dbName.equals("ods")) {
+                if (tableName.equals("byte_t_ad_data_day")) {
+                    oceanengineCost(operation, data, out);
+                } else if (tableName.equals("t_mp_adgroups_data_day")) {
+                    tencentMpCost(operation, data, out);
+                } else if (tableName.equals("t_gdt_adgroups_data_day")) {
+                    tencentGdtCost(operation, data, out);
+                }
+            }
+        }
+
+        private void oceanengineCost(Envelope.Operation operation, Map<String, Object> data, Collector<TransportMap> out) {
+            Long accountId = ObjectUtil.objToLong(data.get("account_id"));
+            Map<String, Object> result = new HashMap<>();
+            result.put("account_id", accountId.toString());
+            result.put("ad_id", ObjectUtil.objToLong(data.get("ad_id")));
+            result.put("date", LocalDateTime.of(ObjectUtil.objToLocalDate(data.get("day")), LocalTime.MIDNIGHT));
+            result.put("cost", ObjectUtil.objToBigDecimal(data.get("cost")).multiply(NUM_100).intValue());
+            result.put("view_count", ObjectUtil.objToInteger(data.get("show_count")));
+            result.put("valid_click_count", ObjectUtil.objToInteger(data.get("click")));
+            result.put("ctr", ObjectUtil.objToBigDecimal(data.get("ctr")));
+            result.put("from_follow_uv", ObjectUtil.objToInteger(data.get("active")));
+            result.put("official_account_follow_rate", null);
+            result.put("order_amount", null);
+            result.put("order_roi", null);
+            result.put("order_count", null);
+            result.put("order_rate", null);
+            result.put("order_unit_price", null);
+            result.put("web_order_cost", null);
+            result.put("first_day_order_amount", null);
+            result.put("first_day_order_count", null);
+            result.put("belong_version", ObjectUtil.objToInteger(data.get("belong_version")));
+
+            out.collect(TransportMap.builder()
+                    .dbName("quchen_text")
+                    .tableName("daily_tt")
+                    .primaryKeys(Arrays.asList("account_id", "ad_id", "date"))
+                    .operation(operation)
+                    .after(result)
+                    .build());
+        }
+
+        private void tencentGdtCost(Envelope.Operation operation, Map<String, Object> data, Collector<TransportMap> out) {
+            Long accountId = ObjectUtil.objToLong(data.get("account_id"));
+            Map<String, Object> result = new HashMap<>();
+            result.put("account_id", accountId.toString());
+            result.put("date", LocalDateTime.of(ObjectUtil.objToLocalDate(data.get("day")), LocalTime.MIDNIGHT));
+            result.put("view_count", ObjectUtil.objToInteger(data.get("view_count")));
+            result.put("valid_click_count", ObjectUtil.objToInteger(data.get("valid_click_count")));
+            result.put("ctr", ObjectUtil.objToBigDecimal(data.get("ctr")));
+            result.put("cpc", ObjectUtil.objToInteger(data.get("cpc")));
+            result.put("cost", ObjectUtil.objToInteger(data.get("cost")));
+            result.put("web_order_count", null);
+            result.put("web_order_rate", null);
+            result.put("web_order_cost", null);
+            result.put("follow_count", ObjectUtil.objToInteger(data.get("biz_follow_count")));
+            result.put("order_amount", null);
+            result.put("order_roi", null);
+            result.put("platform_page_view_count", null);
+            result.put("web_commodity_page_view_count", null);
+            result.put("from_follow_uv", ObjectUtil.objToInteger(data.get("from_follow_uv")));
+
+            out.collect(TransportMap.builder()
+                    .dbName("quchen_text")
+                    .tableName("daily_qq")
+                    .primaryKeys(Arrays.asList("account_id", "date"))
+                    .operation(operation)
+                    .after(result)
+                    .build());
+        }
+
+        private void tencentMpCost(Envelope.Operation operation, Map<String, Object> data, Collector<TransportMap> out) {
+            Long accountId = ObjectUtil.objToLong(data.get("account_id"));
+            Map<String, Object> result = new HashMap<>();
+            result.put("account_id", accountId.toString());
+            result.put("date", LocalDateTime.of(ObjectUtil.objToLocalDate(data.get("day")), LocalTime.MIDNIGHT));
+            result.put("cost", ObjectUtil.objToInteger(data.get("cost")));
+            result.put("view_count", ObjectUtil.objToInteger(data.get("view_count")));
+            result.put("valid_click_count", ObjectUtil.objToInteger(data.get("valid_click_count")));
+            result.put("ctr", ObjectUtil.objToBigDecimal(data.get("ctr")));
+            result.put("official_account_follow_rate", ObjectUtil.objToBigDecimal(data.get("official_account_follow_rate")));
+            result.put("order_amount", null);
+            result.put("order_roi", null);
+            result.put("order_count", null);
+            result.put("order_rate", null);
+            result.put("order_unit_price", null);
+            result.put("web_order_cost", null);
+            result.put("first_day_order_amount", null);
+            result.put("first_day_order_count", null);
+
+            out.collect(TransportMap.builder()
+                    .dbName("quchen_text")
+                    .tableName("daily_vx")
+                    .primaryKeys(Arrays.asList("account_id", "ad_id", "date"))
+                    .operation(operation)
+                    .after(result)
+                    .build());
+        }
+    }
+}

+ 61 - 0
src/main/java/com/qucheng/game/data/oldsystem/sink/CostSink.java

@@ -0,0 +1,61 @@
+package com.qucheng.game.data.oldsystem.sink;
+
+import com.qucheng.game.data.oldsystem.dao.mapper.BaseMapper;
+import com.qucheng.game.data.oldsystem.pojo.MysqlConfigParam;
+import com.qucheng.game.data.oldsystem.pojo.TransportMap;
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.ibatis.mapping.Environment;
+import org.apache.ibatis.session.SqlSession;
+import org.apache.ibatis.session.SqlSessionFactory;
+import org.apache.ibatis.session.SqlSessionFactoryBuilder;
+import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory;
+
+import javax.sql.DataSource;
+
+public class CostSink extends RichSinkFunction<TransportMap> {
+    private final MysqlConfigParam qcMysqlConfigParam;
+    protected SqlSessionFactory qcSqlSessionFactory;
+
+    public CostSink(MysqlConfigParam qcMysqlConfigParam) {
+        this.qcMysqlConfigParam = qcMysqlConfigParam;
+    }
+
+    @Override
+    public void open(Configuration configuration) {
+        qcSqlSessionFactory = sqlSessionFactory(qcMysqlConfigParam);
+    }
+
+    @Override
+    public void invoke(TransportMap value, Context context) {
+        try (SqlSession session = qcSqlSessionFactory.openSession(true)) {
+            BaseMapper mapper = session.getMapper(BaseMapper.class);
+            mapper.saveOrUpdate(value.getTableName(), value.getPrimaryKeys(), value.getAfter());
+        }
+    }
+
+    @Override
+    public void close() {
+    }
+
+    private SqlSessionFactory sqlSessionFactory(MysqlConfigParam mysqlConfig) {
+        // 初始化连接池
+        HikariConfig config = new HikariConfig();
+        config.setDriverClassName(mysqlConfig.getDriverClassName());
+        config.setJdbcUrl(mysqlConfig.getUrl());
+        config.setUsername(mysqlConfig.getUsername());
+        config.setPassword(mysqlConfig.getPassword());
+        //连接池内保留的最少连接数
+        config.setMinimumIdle(1);
+        DataSource dataSource = new HikariDataSource(config);
+
+        Environment mybatisEnv = new Environment("ads-mysql", new JdbcTransactionFactory(), dataSource);
+
+        org.apache.ibatis.session.Configuration mybatisConfig = new org.apache.ibatis.session.Configuration(mybatisEnv);
+        mybatisConfig.addMapper(BaseMapper.class);
+        return new SqlSessionFactoryBuilder().build(mybatisConfig);
+    }
+}

+ 2 - 2
src/main/java/com/qucheng/game/data/oldsystem/sink/ads_sink.java

@@ -3,7 +3,7 @@ package com.qucheng.game.data.oldsystem.sink;
 import com.alibaba.fastjson2.JSON;
 import com.alibaba.fastjson2.JSONObject;
 import com.qucheng.game.data.oldsystem.Env;
-import com.qucheng.game.data.oldsystem.ods.ByteDailyCost;
+import com.qucheng.game.data.oldsystem.ods.DailyCost;
 import org.apache.commons.dbcp2.BasicDataSource;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.configuration.Configuration;
@@ -175,7 +175,7 @@ public class ads_sink extends RichSinkFunction<JSONObject> {
 
     private static Connection getConnection(BasicDataSource dataSource) throws IOException {
         Properties props = new Properties();
-        props.load(ByteDailyCost.class.getResourceAsStream(Env.isTest ? "/application.test.properties" : "/application.properties"));
+        props.load(DailyCost.class.getResourceAsStream(Env.isTest ? "/application.test.properties" : "/application.properties"));
         dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
         //注意,替换成自己本地的 mysql 数据库地址和用户名、密码
         dataSource.setUrl(props.getProperty("mysql.quChengText.url")); //test为数据库名

+ 3 - 0
src/main/java/com/qucheng/game/data/oldsystem/util/ObjectUtil.java

@@ -150,6 +150,9 @@ public class ObjectUtil {
             if (value instanceof Long) {
                 return ((Long) value).intValue();
             }
+            if (value instanceof Short) {
+                return ((Short) value).intValue();
+            }
             if (value instanceof Float) {
                 return ((Float) value).intValue();
             }