ByteDailyCost.java 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. package com.qucheng.game.data.oldsystem.ods;
  2. import com.alibaba.fastjson2.JSON;
  3. import com.alibaba.fastjson2.JSONObject;
  4. import com.qucheng.game.data.oldsystem.Env;
  5. import com.qucheng.game.data.oldsystem.pojo.FlinkAppConfigParam;
  6. import com.qucheng.game.data.oldsystem.serialization.CustomerDeserializationSchema;
  7. import com.qucheng.game.data.oldsystem.sink.ads_sink;
  8. import com.ververica.cdc.connectors.mysql.source.MySqlSource;
  9. import com.ververica.cdc.connectors.mysql.table.StartupOptions;
  10. import org.apache.commons.lang3.StringUtils;
  11. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  12. import org.apache.flink.api.common.functions.MapFunction;
  13. import org.apache.flink.api.common.restartstrategy.RestartStrategies;
  14. import org.apache.flink.api.common.time.Time;
  15. import org.apache.flink.configuration.Configuration;
  16. import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
  17. import org.apache.flink.streaming.api.CheckpointingMode;
  18. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  19. import org.apache.flink.streaming.api.environment.CheckpointConfig;
  20. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  21. import java.math.BigDecimal;
  22. import java.math.RoundingMode;
  23. import java.util.Arrays;
  24. import java.util.List;
  25. import java.util.Properties;
  26. import java.util.concurrent.TimeUnit;
  27. /**
  28. * ODS层数据填充
  29. * 过滤字段及转换字段
  30. */
  31. public class ByteDailyCost {
  32. private static final FlinkAppConfigParam appConfigParam = Env.isTest ? FlinkAppConfigParam.builder()
  33. .savePath("")
  34. .interval(300L)
  35. .timeout(300L)
  36. .minBetween(1L)
  37. .build() : FlinkAppConfigParam.builder()
  38. .savePath("hdfs://nameservice1:8020/user/limeng/cluster_yarn/checkpoints/ODS/" + ByteDailyCost.class.getSimpleName())
  39. .interval(300L)
  40. .timeout(300L)
  41. .minBetween(1L)
  42. .build();
  43. public static void main(String[] args) throws Exception {
  44. System.setProperty("HADOOP_USER_NAME", "flink");
  45. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  46. // 加载配置文件到 flink的全局配置中
  47. Properties props = new Properties();
  48. props.load(ByteDailyCost.class.getResourceAsStream(Env.isTest ? "/application.test.properties" : "/application.properties"));
  49. Configuration configuration = new Configuration();
  50. props.stringPropertyNames().forEach(key -> {
  51. String value = props.getProperty(key);
  52. configuration.setString(key.trim(), StringUtils.isBlank(value) ? "" : value.trim());
  53. });
  54. env.getConfig().setGlobalJobParameters(configuration);
  55. // 设置默认并行度
  56. env.setParallelism(1);
  57. // 任务失败后的重启策略
  58. // env.setRestartStrategy(RestartStrategies.noRestart());// 失败不重启
  59. // env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));// 3:最大重试次数、10:重启间隔时间
  60. env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.of(5, TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)));// 5分钟内最多重启 3次,重启间隔时间:10s
  61. // checkpoint配置
  62. env.enableCheckpointing(appConfigParam.getInterval() * 1000, CheckpointingMode.EXACTLY_ONCE);
  63. // checkpoint执行超时时间,超时则 checkpoint失败
  64. env.getCheckpointConfig().setCheckpointTimeout(appConfigParam.getTimeout() * 1000);
  65. // checkpoint执行最小间隔时间
  66. env.getCheckpointConfig().setMinPauseBetweenCheckpoints(appConfigParam.getMinBetween() * 1000);
  67. // 允许并行执行 checkpoint个数,当指定了 minPauseBetweenCheckpoints时,其值无效(就是 1)
  68. env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
  69. // 开启 checkpoints的外部持久化,但是在job失败的时候不会自动清理,需要自己手工清理state。
  70. // ExternalizedCheckpointCleanup用于指定当job canceled的时候外部的 checkpoint该如何清理
  71. // DELETE_ON_CANCELLATION: 在job canceled的时候会自动删除外部 state,但是如果是FAILED的状态则会保留
  72. // RETAIN_ON_CANCELLATION:在job canceled的时候保留外部 state
  73. env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  74. // 设置可容忍的检查点失败数,默认值为0表示不允许容忍任何检查点失败
  75. env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
  76. // 设置状态后端
  77. env.setStateBackend(new HashMapStateBackend());
  78. // 设置检查点目录
  79. if (StringUtils.isNotBlank(appConfigParam.getSavePath())) {
  80. env.getCheckpointConfig().setCheckpointStorage(appConfigParam.getSavePath());
  81. }
  82. MySqlSource<String> mysqlCDCSource = MySqlSource.<String>builder()
  83. .hostname(props.getProperty("cdc.mysql.backup.host"))
  84. .port(StringUtils.isBlank(props.getProperty("cdc.mysql.backup.port")) ? 3306 : Integer.parseInt(props.getProperty("cdc.mysql.backup.port")))
  85. .username(props.getProperty("cdc.mysql.backup.username"))
  86. .password(props.getProperty("cdc.mysql.backup.password"))
  87. .databaseList("ods")
  88. .tableList((StringUtils.join(new String[]{
  89. "ods.byte_t_ad_data_day"
  90. }, ",")))
  91. .deserializer(new CustomerDeserializationSchema())
  92. //5400 和 6400
  93. .startupOptions(StartupOptions.initial())
  94. .build();
  95. DataStreamSource<String> dataStreamSource = env.fromSource(mysqlCDCSource, WatermarkStrategy.noWatermarks(), "ODS_Mysql_Source");
  96. dataStreamSource.map(new FieldFilterMap()).addSink(new ads_sink());
  97. //4.启动任务
  98. env.execute("ByteDailyCost");
  99. }
  100. public static class FieldFilterMap implements MapFunction<String, JSONObject> {
  101. @Override
  102. public JSONObject map(String value) throws Exception {
  103. JSONObject result = new JSONObject();
  104. JSONObject data = JSON.parseObject(value);
  105. String tableName = data.getString("tableName");
  106. JSONObject after = data.getJSONObject("after");
  107. JSONObject key = data.getJSONObject("key");
  108. String afterFinal = "";
  109. String keyFinal = "";
  110. String tableNameFinal = "";
  111. if (tableName.equals("byte_t_ad_data_day")) {
  112. List<String> columns = Arrays.asList("account_id","ad_id", "day",
  113. "cost", "show_count", "click", "ctr","active");
  114. after.entrySet().removeIf(next -> !columns.contains(next.getKey()));
  115. BigDecimal cost = after.getBigDecimal("cost");
  116. // BigDecimal divide = cost.divide(new BigDecimal(100), 6, RoundingMode.HALF_UP);
  117. BigDecimal multiply = cost.multiply(new BigDecimal(100));
  118. after.remove("cost");
  119. after.put("cost",multiply);
  120. keyFinal = key.toString().replaceAll("\"day\":", "\"date\":");
  121. afterFinal = after.toString().replaceAll("\"day\":", "\"date\":")
  122. .replaceAll("\"show_count\":", "\"view_count\":")
  123. .replaceAll("\"click\":", "\"valid_click_count\":")
  124. .replaceAll("\"active\":", "\"from_follow_uv\":");
  125. tableNameFinal = "daily_tt";
  126. }
  127. Object afterFinalJson = JSONObject.parse(afterFinal);
  128. Object keyFinalJson = JSONObject.parse(keyFinal);
  129. result.put("after", afterFinalJson);
  130. result.put("key", keyFinalJson);
  131. result.put("type", data.getString("type"));
  132. result.put("tableName", tableNameFinal);
  133. result.put("db", "quchen_text");
  134. return result;
  135. }
  136. }
  137. }