Browse Source

新增头条消耗CDC

cola 2 năm trước cách đây
mục cha
commit
031b821104

+ 1 - 1
src/main/java/com/qucheng/game/data/oldsystem/Env.java

@@ -1,5 +1,5 @@
 package com.qucheng.game.data.oldsystem;
 
 public class Env {
-    public static final boolean isTest = true;
+    public static final boolean isTest = false;
 }

+ 0 - 14
src/main/java/com/qucheng/game/data/oldsystem/common/qcConfig.java

@@ -1,14 +0,0 @@
-package com.qucheng.game.data.oldsystem.common;
-
-public class qcConfig {
-
-    public static final String MYSQL_DRIVER = "com.mysql.cj.jdbc.Driver";
-
-    // MYSQL 大数据平台连接方式
-    // mysql url
-    //url增加autoReconnect=true,否则会有连接超时
-    public static final String MYSQL_URL_BIGDATA = "jdbc:mysql://qc-game-cluster.rwlb.rds.aliyuncs.com:3306/quchen_text?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8";
-    public static final String MYSQL_USERNAME_BIGDATA = "qc";
-    public static final String MYSQL_PASSWORD_BIGDATA = "Qc_1234567";
-
-}

+ 58 - 41
src/main/java/com/qucheng/game/data/oldsystem/ods/ByteDailyCost.java

@@ -1,11 +1,11 @@
 package com.qucheng.game.data.oldsystem.ods;
 
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
 import com.qucheng.game.data.oldsystem.Env;
 import com.qucheng.game.data.oldsystem.pojo.FlinkAppConfigParam;
-import com.qucheng.game.data.oldsystem.pojo.MysqlConfigParam;
-import com.qucheng.game.data.oldsystem.pojo.TransportMap;
-import com.qucheng.game.data.oldsystem.serialization.MapDebeziumDeserializationSchema;
-import com.qucheng.game.data.oldsystem.sink.CdcMysqlTablesSink;
+import com.qucheng.game.data.oldsystem.serialization.CustomerDeserializationSchema;
+import com.qucheng.game.data.oldsystem.sink.ads_sink;
 import com.ververica.cdc.connectors.mysql.source.MySqlSource;
 import com.ververica.cdc.connectors.mysql.table.StartupOptions;
 import org.apache.commons.lang3.StringUtils;
@@ -21,11 +21,14 @@ import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
 import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
+/**
+ * ODS层数据填充
+ * 过滤字段及转换字段
+ */
 public class ByteDailyCost {
     private static final FlinkAppConfigParam appConfigParam = Env.isTest ? FlinkAppConfigParam.builder()
             .savePath("")
@@ -82,7 +85,7 @@ public class ByteDailyCost {
             env.getCheckpointConfig().setCheckpointStorage(appConfigParam.getSavePath());
         }
 
-        MySqlSource<TransportMap> mysqlCDCSource = MySqlSource.<TransportMap>builder()
+        MySqlSource<String> mysqlCDCSource = MySqlSource.<String>builder()
                 .hostname(props.getProperty("cdc.mysql.backup.host"))
                 .port(StringUtils.isBlank(props.getProperty("cdc.mysql.backup.port")) ? 3306 : Integer.parseInt(props.getProperty("cdc.mysql.backup.port")))
                 .username(props.getProperty("cdc.mysql.backup.username"))
@@ -91,48 +94,62 @@ public class ByteDailyCost {
                 .tableList((StringUtils.join(new String[]{
                         "ods.byte_t_ad_data_day"
                 }, ",")))
-                .deserializer(new MapDebeziumDeserializationSchema())
+                .deserializer(new CustomerDeserializationSchema())
+                //5400 和 6400
                 .startupOptions(StartupOptions.initial())
                 .build();
 
-        MysqlConfigParam sinkMysqlConfig = MysqlConfigParam.builder()
-                .url(props.getProperty("mysql.quChengText.url"))
-                .username(props.getProperty("mysql.quChengText.username"))
-                .password(props.getProperty("mysql.quChengText.password"))
-                .build();
+        DataStreamSource<String> dataStreamSource = env.fromSource(mysqlCDCSource, WatermarkStrategy.noWatermarks(), "ODS_Mysql_Source");
+
+        dataStreamSource.map(new FieldFilterMap()).addSink(new ads_sink());
 
-        DataStreamSource<TransportMap> cdcSource = env.fromSource(mysqlCDCSource, WatermarkStrategy.noWatermarks(), "mysqlCDCSource");
-        cdcSource.print();
-        cdcSource.map(new FieldFilterMap())
-                .addSink(new CdcMysqlTablesSink(sinkMysqlConfig));
 
-        env.execute(ByteDailyCost.class.getSimpleName());
+        //4.启动任务
+        env.execute("ByteDailyCost");
     }
 
-    /**
-     * 字段过滤和映射
-     */
-    public static class FieldFilterMap implements MapFunction<TransportMap, TransportMap> {
+    public static class FieldFilterMap implements MapFunction<String, JSONObject> {
         @Override
-        public TransportMap map(TransportMap transportMap) throws Exception {
-            if (transportMap.getAfter() == null) {
-                return transportMap;
-            }
-            Map<String, Object> after = transportMap.getAfter();
-            if (transportMap.getTableName().equalsIgnoreCase("byte_t_ad_data_day")) {
-                Map<String, Object> pojo = new HashMap<>(after.size());
-                pojo.put("account_id",after.get("account_id"));
-                pojo.put("date",after.get("day"));
-                pojo.put("cost",after.get("cost"));
-                pojo.put("view_count",after.get("show"));
-                pojo.put("valid_click_count",after.get("click"));
-                pojo.put("ctr",after.get("ctr"));
-                after = pojo;
-                transportMap.setTableName("daily_tt");
-                transportMap.setPrimaryKeys(Arrays.asList("account_id", "day"));
-                transportMap.setAfter(after);
+        public JSONObject map(String value) throws Exception {
+
+            JSONObject result = new JSONObject();
+            JSONObject data = JSON.parseObject(value);
+
+            String tableName = data.getString("tableName");
+            JSONObject after = data.getJSONObject("after");
+            JSONObject key = data.getJSONObject("key");
+            String afterFinal = "";
+            String keyFinal = "";
+            String tableNameFinal = "";
+
+            if (tableName.equals("byte_t_ad_data_day")) {
+
+                List<String> columns = Arrays.asList("account_id","ad_id", "day",
+                        "cost", "show_count", "click", "ctr","active");
+
+                after.entrySet().removeIf(next -> !columns.contains(next.getKey()));
+
+                keyFinal = key.toString().replaceAll("\"day\":", "\"date\":");
+
+                afterFinal = after.toString().replaceAll("\"day\":", "\"date\":")
+                        .replaceAll("\"show_count\":", "\"view_count\":")
+                        .replaceAll("\"click\":", "\"valid_click_count\":")
+                        .replaceAll("\"active\":", "\"from_follow_uv\":");
+
+
+                tableNameFinal = "daily_tt";
             }
-            return transportMap;
+
+            Object afterFinalJson = JSONObject.parse(afterFinal);
+            Object keyFinalJson = JSONObject.parse(keyFinal);
+
+            result.put("after", afterFinalJson);
+            result.put("key", keyFinalJson);
+            result.put("type", data.getString("type"));
+            result.put("tableName", tableNameFinal);
+            result.put("db", "quchen_text");
+
+            return result;
         }
     }
-}
+}

+ 0 - 97
src/main/java/com/qucheng/game/data/oldsystem/ods/OdsDataFilling.java

@@ -1,97 +0,0 @@
-package com.qucheng.game.data.oldsystem.ods;
-
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.JSONObject;
-import com.qucheng.game.data.oldsystem.serialization.CustomerDeserializationSchema;
-import com.qucheng.game.data.oldsystem.sink.ads_sink;
-import com.ververica.cdc.connectors.mysql.source.MySqlSource;
-import com.ververica.cdc.connectors.mysql.table.StartupOptions;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * ODS层数据填充
- * 过滤字段及转换字段
- */
-public class OdsDataFilling {
-
-    public static void main(String[] args) throws Exception {
-        System.setProperty("HADOOP_USER_NAME", "flink");
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(1);
-
-
-        //2.通过FlinkCDC构建SourceFunction并读取数据
-        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
-                .hostname("118.31.103.66")
-                .port(3306)
-                .username("root")
-                .password("Qc_1234567")
-                .databaseList("ods")
-                .tableList("ods.byte_t_ad_data_day")
-                .deserializer(new CustomerDeserializationSchema())
-                //5400 和 6400
-                .startupOptions(StartupOptions.initial())
-                .build();
-
-        DataStreamSource<String> dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "ODS_Mysql_Source");
-
-        dataStreamSource.map(new FieldFilterMap()).addSink(new ads_sink());
-
-
-        //4.启动任务
-        env.execute("GAME_ODS_FILLING");
-    }
-
-    public static class FieldFilterMap implements MapFunction<String, JSONObject> {
-        @Override
-        public JSONObject map(String value) throws Exception {
-
-            JSONObject result = new JSONObject();
-            JSONObject data = JSON.parseObject(value);
-
-            String tableName = data.getString("tableName");
-            JSONObject after = data.getJSONObject("after");
-            JSONObject key = data.getJSONObject("key");
-            String afterFinal = "";
-            String keyFinal = "";
-            String tableNameFinal = "";
-
-            if (tableName.equals("byte_t_ad_data_day")) {
-
-                List<String> columns = Arrays.asList("account_id", "day",
-                        "cost", "show", "click", "ctr");
-
-                after.entrySet().removeIf(next -> !columns.contains(next.getKey()));
-
-                key.remove("ad_id");
-
-                keyFinal = key.toString().replaceAll("\"day\":", "\"date\":");
-
-                afterFinal = after.toString().replaceAll("\"day\":", "\"date\":")
-                        .replaceAll("\"show\":", "\"view_count\":")
-                        .replaceAll("\"click\":", "\"valid_click_count\":");
-
-
-                tableNameFinal = "daily_tt";
-            }
-
-            Object afterFinalJson = JSONObject.parse(afterFinal);
-            Object keyFinalJson = JSONObject.parse(keyFinal);
-
-            result.put("after", afterFinalJson);
-            result.put("key", keyFinalJson);
-            result.put("type", data.getString("type"));
-            result.put("tableName", tableNameFinal);
-            result.put("db", "quchen_text");
-
-            return result;
-        }
-    }
-}

+ 9 - 6
src/main/java/com/qucheng/game/data/oldsystem/ods/OldGameSystemCDCBgNew.java

@@ -89,12 +89,12 @@ public class OldGameSystemCDCBgNew {
                 .databaseList("bg_new_sdk_db_mp")
                 .tableList((StringUtils.join(new String[]{
                         "bg_new_sdk_db_mp.h_game",
-                        "bg_new_sdk_db_mp.h_mem_game",
-                        "bg_new_sdk_db_mp.h_member",
                         "bg_new_sdk_db_mp.h_mg_role",
                         "bg_new_sdk_db_mp.h_pay",
                         "bg_new_sdk_db_mp.h_pay_ext",
-                        "bg_new_sdk_db_mp.h_user"
+                        "bg_new_sdk_db_mp.qc_h_mem_game",
+                        "bg_new_sdk_db_mp.qc_h_member",
+                        "bg_new_sdk_db_mp.qc_h_user"
                 }, ",")))
                 .deserializer(new MapDebeziumDeserializationSchema())
                 .startupOptions(StartupOptions.latest())
@@ -168,7 +168,7 @@ public class OldGameSystemCDCBgNew {
                     pojo.put(field, after.get(field));
                 }
                 after = pojo;
-            } else if (transportMap.getTableName().equalsIgnoreCase("h_mem_game")) {
+            } else if (transportMap.getTableName().equalsIgnoreCase("qc_h_mem_game")) {
                 Map<String, Object> pojo = new HashMap<>(after.size());
                 for (String field : Arrays.asList(
                         "id",
@@ -184,9 +184,10 @@ public class OldGameSystemCDCBgNew {
                         "status",
                         "sum_money")) {
                     pojo.put(field, after.get(field));
+                    transportMap.setTableName("h_mem_game");
                 }
                 after = pojo;
-            } else if (transportMap.getTableName().equalsIgnoreCase("h_member")) {
+            } else if (transportMap.getTableName().equalsIgnoreCase("qc_h_member")) {
                 Map<String, Object> pojo = new HashMap<>(after.size());
                 for (String field : Arrays.asList(
                         "id",
@@ -200,6 +201,7 @@ public class OldGameSystemCDCBgNew {
                         "update_time",
                         "username")) {
                     pojo.put(field, after.get(field));
+                    transportMap.setTableName("h_member");
                 }
                 after = pojo;
             } else if (transportMap.getTableName().equalsIgnoreCase("h_mg_role")) {
@@ -308,7 +310,7 @@ public class OldGameSystemCDCBgNew {
                     pojo.put(field, after.get(field));
                 }
                 after = pojo;
-            } else if (transportMap.getTableName().equalsIgnoreCase("h_user")) {
+            } else if (transportMap.getTableName().equalsIgnoreCase("qc_h_user")) {
                 Map<String, Object> pojo = new HashMap<>(after.size());
                 for (String field : Arrays.asList("id",
                         "user_login",
@@ -321,6 +323,7 @@ public class OldGameSystemCDCBgNew {
                         "parent_account_id",
                         "cp_id")) {
                     pojo.put(field, after.get(field));
+                    transportMap.setTableName("h_user");
                 }
                 after = pojo;
             }

+ 9 - 6
src/main/java/com/qucheng/game/data/oldsystem/ods/OldGameSystemCDCBgOld.java

@@ -89,12 +89,12 @@ public class OldGameSystemCDCBgOld {
                 .databaseList("bg_old_sdk_db_mp")
                 .tableList((StringUtils.join(new String[]{
                         "bg_old_sdk_db_mp.h_game",
-                        "bg_old_sdk_db_mp.h_mem_game",
-                        "bg_old_sdk_db_mp.h_member",
                         "bg_old_sdk_db_mp.h_mg_role",
                         "bg_old_sdk_db_mp.h_pay",
                         "bg_old_sdk_db_mp.h_pay_ext",
-                        "bg_old_sdk_db_mp.h_user"
+                        "bg_old_sdk_db_mp.qc_h_mem_game",
+                        "bg_old_sdk_db_mp.qc_h_member",
+                        "bg_old_sdk_db_mp.qc_h_user"
                 }, ",")))
                 .deserializer(new MapDebeziumDeserializationSchema())
                 .startupOptions(StartupOptions.latest())
@@ -168,7 +168,7 @@ public class OldGameSystemCDCBgOld {
                     pojo.put(field, after.get(field));
                 }
                 after = pojo;
-            } else if (transportMap.getTableName().equalsIgnoreCase("h_mem_game")) {
+            } else if (transportMap.getTableName().equalsIgnoreCase("qc_h_mem_game")) {
                 Map<String, Object> pojo = new HashMap<>(after.size());
                 for (String field : Arrays.asList(
                         "id",
@@ -184,9 +184,10 @@ public class OldGameSystemCDCBgOld {
                         "status",
                         "sum_money")) {
                     pojo.put(field, after.get(field));
+                    transportMap.setTableName("h_mem_game");
                 }
                 after = pojo;
-            } else if (transportMap.getTableName().equalsIgnoreCase("h_member")) {
+            } else if (transportMap.getTableName().equalsIgnoreCase("qc_h_member")) {
                 Map<String, Object> pojo = new HashMap<>(after.size());
                 for (String field : Arrays.asList(
                         "id",
@@ -200,6 +201,7 @@ public class OldGameSystemCDCBgOld {
                         "update_time",
                         "username")) {
                     pojo.put(field, after.get(field));
+                    transportMap.setTableName("h_member");
                 }
                 after = pojo;
             } else if (transportMap.getTableName().equalsIgnoreCase("h_mg_role")) {
@@ -308,7 +310,7 @@ public class OldGameSystemCDCBgOld {
                     pojo.put(field, after.get(field));
                 }
                 after = pojo;
-            } else if (transportMap.getTableName().equalsIgnoreCase("h_user")) {
+            } else if (transportMap.getTableName().equalsIgnoreCase("qc_h_user")) {
                 Map<String, Object> pojo = new HashMap<>(after.size());
                 for (String field : Arrays.asList("id",
                         "user_login",
@@ -321,6 +323,7 @@ public class OldGameSystemCDCBgOld {
                         "parent_account_id",
                         "cp_id")) {
                     pojo.put(field, after.get(field));
+                    transportMap.setTableName("h_user");
                 }
                 after = pojo;
             }

+ 11 - 6
src/main/java/com/qucheng/game/data/oldsystem/sink/ads_sink.java

@@ -2,19 +2,22 @@ package com.qucheng.game.data.oldsystem.sink;
 
 import com.alibaba.fastjson2.JSON;
 import com.alibaba.fastjson2.JSONObject;
+import com.qucheng.game.data.oldsystem.Env;
+import com.qucheng.game.data.oldsystem.ods.ByteDailyCost;
 import org.apache.commons.dbcp2.BasicDataSource;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 
+import java.io.IOException;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.util.Collection;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 
-import static com.qucheng.game.data.oldsystem.common.qcConfig.*;
 
 public class ads_sink extends RichSinkFunction<JSONObject> {
 
@@ -170,12 +173,14 @@ public class ads_sink extends RichSinkFunction<JSONObject> {
 
     }
 
-    private static Connection getConnection(BasicDataSource dataSource) {
-        dataSource.setDriverClassName(MYSQL_DRIVER);
+    private static Connection getConnection(BasicDataSource dataSource) throws IOException {
+        Properties props = new Properties();
+        props.load(ByteDailyCost.class.getResourceAsStream(Env.isTest ? "/application.test.properties" : "/application.properties"));
+        dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
         //注意,替换成自己本地的 mysql 数据库地址和用户名、密码
-        dataSource.setUrl(MYSQL_URL_BIGDATA); //test为数据库名
-        dataSource.setUsername(MYSQL_USERNAME_BIGDATA); //数据库用户名
-        dataSource.setPassword(MYSQL_PASSWORD_BIGDATA); //数据库密码
+        dataSource.setUrl(props.getProperty("mysql.quChengText.url")); //test为数据库名
+        dataSource.setUsername(props.getProperty("mysql.quChengText.username")); //数据库用户名
+        dataSource.setPassword(props.getProperty("mysql.quChengText.password")); //数据库密码
         //设置连接池的一些参数
         // 设置初始化连接池时池中连接的数量
         dataSource.setInitialSize(10);