DailyCost.java 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. package com.qucheng.game.data.oldsystem.ods;
  2. import com.qucheng.game.data.oldsystem.Env;
  3. import com.qucheng.game.data.oldsystem.pojo.FlinkAppConfigParam;
  4. import com.qucheng.game.data.oldsystem.pojo.MysqlConfigParam;
  5. import com.qucheng.game.data.oldsystem.pojo.TransportMap;
  6. import com.qucheng.game.data.oldsystem.serialization.MapDebeziumDeserializationSchema;
  7. import com.qucheng.game.data.oldsystem.sink.CostSink;
  8. import com.qucheng.game.data.oldsystem.util.ObjectUtil;
  9. import com.ververica.cdc.connectors.mysql.source.MySqlSource;
  10. import com.ververica.cdc.connectors.mysql.table.StartupOptions;
  11. import io.debezium.data.Envelope;
  12. import org.apache.commons.lang3.StringUtils;
  13. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  14. import org.apache.flink.api.common.functions.RichFlatMapFunction;
  15. import org.apache.flink.api.common.restartstrategy.RestartStrategies;
  16. import org.apache.flink.api.common.time.Time;
  17. import org.apache.flink.configuration.Configuration;
  18. import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
  19. import org.apache.flink.streaming.api.CheckpointingMode;
  20. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  21. import org.apache.flink.streaming.api.environment.CheckpointConfig;
  22. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  23. import org.apache.flink.util.Collector;
  24. import java.math.BigDecimal;
  25. import java.time.LocalDateTime;
  26. import java.time.LocalTime;
  27. import java.util.*;
  28. import java.util.concurrent.TimeUnit;
  29. /**
  30. * ODS层数据填充
  31. * 过滤字段及转换字段
  32. */
  33. public class DailyCost {
  34. private static final FlinkAppConfigParam appConfigParam = Env.isTest ? FlinkAppConfigParam.builder()
  35. .savePath("")
  36. .interval(300L)
  37. .timeout(300L)
  38. .minBetween(1L)
  39. .build() : FlinkAppConfigParam.builder()
  40. .savePath("hdfs://nameservice1:8020/user/limeng/cluster_yarn/checkpoints/ODS/" + DailyCost.class.getSimpleName())
  41. .interval(300L)
  42. .timeout(300L)
  43. .minBetween(1L)
  44. .build();
  45. public static void main(String[] args) throws Exception {
  46. System.setProperty("HADOOP_USER_NAME", "flink");
  47. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  48. // 加载配置文件到 flink的全局配置中
  49. Properties props = new Properties();
  50. props.load(DailyCost.class.getResourceAsStream(Env.isTest ? "/application.test.properties" : "/application.properties"));
  51. Configuration configuration = new Configuration();
  52. props.stringPropertyNames().forEach(key -> {
  53. String value = props.getProperty(key);
  54. configuration.setString(key.trim(), StringUtils.isBlank(value) ? "" : value.trim());
  55. });
  56. env.getConfig().setGlobalJobParameters(configuration);
  57. // 设置默认并行度
  58. env.setParallelism(1);
  59. // 任务失败后的重启策略
  60. // env.setRestartStrategy(RestartStrategies.noRestart());// 失败不重启
  61. // env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));// 3:最大重试次数、10:重启间隔时间
  62. env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.of(5, TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)));// 5分钟内最多重启 3次,重启间隔时间:10s
  63. // checkpoint配置
  64. env.enableCheckpointing(appConfigParam.getInterval() * 1000, CheckpointingMode.EXACTLY_ONCE);
  65. // checkpoint执行超时时间,超时则 checkpoint失败
  66. env.getCheckpointConfig().setCheckpointTimeout(appConfigParam.getTimeout() * 1000);
  67. // checkpoint执行最小间隔时间
  68. env.getCheckpointConfig().setMinPauseBetweenCheckpoints(appConfigParam.getMinBetween() * 1000);
  69. // 允许并行执行 checkpoint个数,当指定了 minPauseBetweenCheckpoints时,其值无效(就是 1)
  70. env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
  71. // 开启 checkpoints的外部持久化,但是在job失败的时候不会自动清理,需要自己手工清理state。
  72. // ExternalizedCheckpointCleanup用于指定当job canceled的时候外部的 checkpoint该如何清理
  73. // DELETE_ON_CANCELLATION: 在job canceled的时候会自动删除外部 state,但是如果是FAILED的状态则会保留
  74. // RETAIN_ON_CANCELLATION:在job canceled的时候保留外部 state
  75. env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  76. // 设置可容忍的检查点失败数,默认值为0表示不允许容忍任何检查点失败
  77. env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
  78. // 设置状态后端
  79. env.setStateBackend(new HashMapStateBackend());
  80. // 设置检查点目录
  81. if (StringUtils.isNotBlank(appConfigParam.getSavePath())) {
  82. env.getCheckpointConfig().setCheckpointStorage(appConfigParam.getSavePath());
  83. }
  84. MysqlConfigParam qcMysqlConfig = MysqlConfigParam.builder()
  85. .url(props.getProperty("mysql.quChengText.url"))
  86. .username(props.getProperty("mysql.quChengText.username"))
  87. .password(props.getProperty("mysql.quChengText.password"))
  88. .build();
  89. MySqlSource<TransportMap> mysqlCDCSource = MySqlSource.<TransportMap>builder()
  90. .hostname(props.getProperty("cdc.mysql.backup.host"))
  91. .port(StringUtils.isBlank(props.getProperty("cdc.mysql.backup.port")) ? 3306 : Integer.parseInt(props.getProperty("cdc.mysql.backup.port")))
  92. .username(props.getProperty("cdc.mysql.backup.username"))
  93. .password(props.getProperty("cdc.mysql.backup.password"))
  94. .databaseList("ods")
  95. .tableList((StringUtils.join(new String[]{
  96. "ods.byte_t_ad_data_day",
  97. "ods.t_gdt_adgroups_data_day",
  98. }, ",")))
  99. .deserializer(new MapDebeziumDeserializationSchema())
  100. //5400 和 6400
  101. .startupOptions(StartupOptions.initial())
  102. .build();
  103. SingleOutputStreamOperator<TransportMap> out = env.fromSource(mysqlCDCSource, WatermarkStrategy.noWatermarks(), "ODS_Mysql_Source")
  104. .flatMap(new FieldFilterMap());
  105. out.addSink(new CostSink(qcMysqlConfig));
  106. env.execute(DailyCost.class.getSimpleName());
  107. }
  108. public static class FieldFilterMap extends RichFlatMapFunction<TransportMap, TransportMap> {
  109. private static final BigDecimal NUM_100 = new BigDecimal(100);
  110. @Override
  111. public void flatMap(TransportMap transportMap, Collector<TransportMap> out) throws Exception {
  112. String dbName = transportMap.getDbName();
  113. String tableName = transportMap.getTableName();
  114. List<String> primaryKeys = transportMap.getPrimaryKeys();
  115. Envelope.Operation operation = transportMap.getOperation();
  116. Map<String, Object> data = transportMap.getAfter();
  117. if (operation == Envelope.Operation.DELETE) {
  118. data = transportMap.getBefore();
  119. } else {
  120. data = transportMap.getAfter();
  121. }
  122. if (dbName.equals("ods")) {
  123. if (tableName.equals("byte_t_ad_data_day")) {
  124. oceanengineCost(operation, data, out);
  125. } else if (tableName.equals("t_mp_adgroups_data_day")) {
  126. tencentMpCost(operation, data, out);
  127. } else if (tableName.equals("t_gdt_adgroups_data_day")) {
  128. tencentGdtCost(operation, data, out);
  129. }
  130. }
  131. }
  132. private void oceanengineCost(Envelope.Operation operation, Map<String, Object> data, Collector<TransportMap> out) {
  133. Long accountId = ObjectUtil.objToLong(data.get("account_id"));
  134. Map<String, Object> result = new HashMap<>();
  135. result.put("account_id", accountId.toString());
  136. result.put("ad_id", ObjectUtil.objToLong(data.get("ad_id")));
  137. result.put("date", LocalDateTime.of(ObjectUtil.objToLocalDate(data.get("day")), LocalTime.MIDNIGHT));
  138. result.put("cost", ObjectUtil.objToBigDecimal(data.get("cost")).multiply(NUM_100).intValue());
  139. result.put("view_count", ObjectUtil.objToInteger(data.get("show_count")));
  140. result.put("valid_click_count", ObjectUtil.objToInteger(data.get("click")));
  141. result.put("ctr", ObjectUtil.objToBigDecimal(data.get("ctr")));
  142. result.put("from_follow_uv", ObjectUtil.objToInteger(data.get("active")));
  143. result.put("official_account_follow_rate", null);
  144. result.put("order_amount", null);
  145. result.put("order_roi", null);
  146. result.put("order_count", null);
  147. result.put("order_rate", null);
  148. result.put("order_unit_price", null);
  149. result.put("web_order_cost", null);
  150. result.put("first_day_order_amount", null);
  151. result.put("first_day_order_count", null);
  152. result.put("belong_version", ObjectUtil.objToInteger(data.get("belong_version")));
  153. out.collect(TransportMap.builder()
  154. .dbName("quchen_text")
  155. .tableName("daily_tt")
  156. .primaryKeys(Arrays.asList("account_id", "ad_id", "date"))
  157. .operation(operation)
  158. .after(result)
  159. .build());
  160. }
  161. private void tencentGdtCost(Envelope.Operation operation, Map<String, Object> data, Collector<TransportMap> out) {
  162. Long accountId = ObjectUtil.objToLong(data.get("account_id"));
  163. Map<String, Object> result = new HashMap<>();
  164. result.put("account_id", accountId.toString());
  165. result.put("adgroup_id", ObjectUtil.objToLong(data.get("adgroup_id")));
  166. result.put("date", LocalDateTime.of(ObjectUtil.objToLocalDate(data.get("day")), LocalTime.MIDNIGHT));
  167. result.put("view_count", ObjectUtil.objToInteger(data.get("view_count")));
  168. result.put("valid_click_count", ObjectUtil.objToInteger(data.get("valid_click_count")));
  169. result.put("ctr", ObjectUtil.objToBigDecimal(data.get("ctr")));
  170. result.put("cpc", ObjectUtil.objToInteger(data.get("cpc")));
  171. result.put("cost", ObjectUtil.objToInteger(data.get("cost")));
  172. result.put("web_order_count", null);
  173. result.put("web_order_rate", null);
  174. result.put("web_order_cost", null);
  175. result.put("follow_count", ObjectUtil.objToInteger(data.get("biz_follow_count")));
  176. result.put("order_amount", null);
  177. result.put("order_roi", null);
  178. result.put("platform_page_view_count", null);
  179. result.put("web_commodity_page_view_count", null);
  180. result.put("from_follow_uv", ObjectUtil.objToInteger(data.get("from_follow_uv")));
  181. out.collect(TransportMap.builder()
  182. .dbName("quchen_text")
  183. .tableName("daily_qq")
  184. .primaryKeys(Arrays.asList("account_id", "adgroup_id", "date"))
  185. .operation(operation)
  186. .after(result)
  187. .build());
  188. }
  189. private void tencentMpCost(Envelope.Operation operation, Map<String, Object> data, Collector<TransportMap> out) {
  190. Long accountId = ObjectUtil.objToLong(data.get("account_id"));
  191. Map<String, Object> result = new HashMap<>();
  192. result.put("account_id", accountId.toString());
  193. result.put("adgroup_id", ObjectUtil.objToLong(data.get("adgroup_id")));
  194. result.put("date", LocalDateTime.of(ObjectUtil.objToLocalDate(data.get("day")), LocalTime.MIDNIGHT));
  195. result.put("cost", ObjectUtil.objToInteger(data.get("cost")));
  196. result.put("view_count", ObjectUtil.objToInteger(data.get("view_count")));
  197. result.put("valid_click_count", ObjectUtil.objToInteger(data.get("valid_click_count")));
  198. result.put("ctr", ObjectUtil.objToBigDecimal(data.get("ctr")));
  199. result.put("official_account_follow_rate", ObjectUtil.objToBigDecimal(data.get("official_account_follow_rate")));
  200. result.put("order_amount", null);
  201. result.put("order_roi", null);
  202. result.put("order_count", null);
  203. result.put("order_rate", null);
  204. result.put("order_unit_price", null);
  205. result.put("web_order_cost", null);
  206. result.put("first_day_order_amount", null);
  207. result.put("first_day_order_count", null);
  208. out.collect(TransportMap.builder()
  209. .dbName("quchen_text")
  210. .tableName("daily_vx")
  211. .primaryKeys(Arrays.asList("account_id", "adgroup_id", "date"))
  212. .operation(operation)
  213. .after(result)
  214. .build());
  215. }
  216. }
  217. }