cola vor 2 Jahren
Ursprung
Commit
6296058d0a

+ 12 - 6
src/main/java/com/qucheng/game/data/oldsystem/ods/AdAccountCDC.java

@@ -26,6 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
 import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.util.Collector;
@@ -66,7 +67,7 @@ public class AdAccountCDC {
         props.load(AdAccountCDC.class.getResourceAsStream(Env.isTest ? "/application.test.properties" : "/application.properties"));
 
         // 设置默认并行度
-        env.setParallelism(3);
+        env.setParallelism(1);
 
         // 任务失败后的重启策略
         // env.setRestartStrategy(RestartStrategies.noRestart());// 失败不重启
@@ -100,7 +101,7 @@ public class AdAccountCDC {
                 .port(StringUtils.isBlank(props.getProperty("cdc.mysql.zx.port")) ? 3306 : Integer.parseInt(props.getProperty("cdc.mysql.zx.port")))
                 .username(props.getProperty("cdc.mysql.zx.username"))
                 .password(props.getProperty("cdc.mysql.zx.password"))
-                .databaseList("zx-advertising-oceanengine")
+                .databaseList("zx-advertising-oceanengine", "zx-advertising-tencent")
                 .tableList((StringUtils.join(new String[]{
                         "zx-advertising-oceanengine.t_clue_wechat_game",
                         "zx-advertising-oceanengine.t_ad_account",
@@ -135,9 +136,9 @@ public class AdAccountCDC {
                 .password(props.getProperty("mysql.quChengText.password"))
                 .build();
 
-        env.fromSource(mysqlCDCSource, WatermarkStrategy.noWatermarks(), "mysqlCDCSource")
-                .flatMap(new TableMap(odsMysqlConfig))
-                .addSink(new MpConfigAgentSink(bgNewSdkMysqlConfig, bgOldSdkMysqlConfig, qcMysqlConfig));
+        SingleOutputStreamOperator<Tuple3<String, String, TransportMap>> out = env.fromSource(mysqlCDCSource, WatermarkStrategy.noWatermarks(), "mysqlCDCSource")
+                .flatMap(new TableMap(odsMysqlConfig));
+        out.addSink(new MpConfigAgentSink(bgNewSdkMysqlConfig, bgOldSdkMysqlConfig, qcMysqlConfig));
 
         env.execute(AdAccountCDC.class.getSimpleName());
     }
@@ -326,12 +327,13 @@ public class AdAccountCDC {
                         path = (String) temp.get("mini_game_tracking_parameter");
                     }
                 } catch (Exception e) {
+                    e.printStackTrace();
                 }
             }
             if (StringUtils.isBlank(path)) {
                 return;
             }
-            String agentKey = path.replace("?state\\u003d", "");
+            String agentKey = path.replace("?state=", "");
             try (SqlSession session = odsSqlSessionFactory.openSession(true)) {
                 BaseMapper mapper = session.getMapper(BaseMapper.class);
 
@@ -392,6 +394,10 @@ public class AdAccountCDC {
 
         private void tencentAccount(Envelope.Operation operation, Map<String, Object> data, Collector<Tuple3<String, String, TransportMap>> out) {
             Long accountId = ObjectUtil.objToLong(data.get("account_id"));
+            Long adAppId = ObjectUtil.objToLong(data.get("ad_app_id"));
+            if (adAppId != 3L) {
+                return;
+            }
 
             try (SqlSession session = odsSqlSessionFactory.openSession(true)) {
                 BaseMapper mapper = session.getMapper(BaseMapper.class);

+ 10 - 7
src/main/java/com/qucheng/game/data/oldsystem/ods/DailyCost.java

@@ -18,6 +18,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
 import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.util.Collector;
@@ -59,7 +60,7 @@ public class DailyCost {
         });
         env.getConfig().setGlobalJobParameters(configuration);
         // 设置默认并行度
-        env.setParallelism(3);
+        env.setParallelism(1);
 
         // 任务失败后的重启策略
         // env.setRestartStrategy(RestartStrategies.noRestart());// 失败不重启
@@ -101,8 +102,7 @@ public class DailyCost {
                 .password(props.getProperty("cdc.mysql.backup.password"))
                 .databaseList("ods")
                 .tableList((StringUtils.join(new String[]{
-                        /*"ods.byte_t_ad_data_day",
-                        "ods.t_mp_adgroups_data_day",*/
+                        "ods.byte_t_ad_data_day",
                         "ods.t_gdt_adgroups_data_day",
                 }, ",")))
                 .deserializer(new MapDebeziumDeserializationSchema())
@@ -110,8 +110,9 @@ public class DailyCost {
                 .startupOptions(StartupOptions.initial())
                 .build();
 
-        env.fromSource(mysqlCDCSource, WatermarkStrategy.noWatermarks(), "ODS_Mysql_Source")
-                .flatMap(new FieldFilterMap()).addSink(new CostSink(qcMysqlConfig));
+        SingleOutputStreamOperator<TransportMap> out = env.fromSource(mysqlCDCSource, WatermarkStrategy.noWatermarks(), "ODS_Mysql_Source")
+                .flatMap(new FieldFilterMap());
+        out.addSink(new CostSink(qcMysqlConfig));
 
         env.execute(DailyCost.class.getSimpleName());
     }
@@ -177,6 +178,7 @@ public class DailyCost {
             Long accountId = ObjectUtil.objToLong(data.get("account_id"));
             Map<String, Object> result = new HashMap<>();
             result.put("account_id", accountId.toString());
+            result.put("adgroup_id", ObjectUtil.objToLong(data.get("adgroup_id")));
             result.put("date", LocalDateTime.of(ObjectUtil.objToLocalDate(data.get("day")), LocalTime.MIDNIGHT));
             result.put("view_count", ObjectUtil.objToInteger(data.get("view_count")));
             result.put("valid_click_count", ObjectUtil.objToInteger(data.get("valid_click_count")));
@@ -196,7 +198,7 @@ public class DailyCost {
             out.collect(TransportMap.builder()
                     .dbName("quchen_text")
                     .tableName("daily_qq")
-                    .primaryKeys(Arrays.asList("account_id", "date"))
+                    .primaryKeys(Arrays.asList("account_id", "adgroup_id", "date"))
                     .operation(operation)
                     .after(result)
                     .build());
@@ -206,6 +208,7 @@ public class DailyCost {
             Long accountId = ObjectUtil.objToLong(data.get("account_id"));
             Map<String, Object> result = new HashMap<>();
             result.put("account_id", accountId.toString());
+            result.put("adgroup_id", ObjectUtil.objToLong(data.get("adgroup_id")));
             result.put("date", LocalDateTime.of(ObjectUtil.objToLocalDate(data.get("day")), LocalTime.MIDNIGHT));
             result.put("cost", ObjectUtil.objToInteger(data.get("cost")));
             result.put("view_count", ObjectUtil.objToInteger(data.get("view_count")));
@@ -224,7 +227,7 @@ public class DailyCost {
             out.collect(TransportMap.builder()
                     .dbName("quchen_text")
                     .tableName("daily_vx")
-                    .primaryKeys(Arrays.asList("account_id", "ad_id", "date"))
+                    .primaryKeys(Arrays.asList("account_id", "adgroup_id", "date"))
                     .operation(operation)
                     .after(result)
                     .build());