123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165 |
- package com.qucheng.game.data.oldsystem.ods;
- import com.alibaba.fastjson2.JSON;
- import com.alibaba.fastjson2.JSONObject;
- import com.qucheng.game.data.oldsystem.Env;
- import com.qucheng.game.data.oldsystem.pojo.FlinkAppConfigParam;
- import com.qucheng.game.data.oldsystem.serialization.CustomerDeserializationSchema;
- import com.qucheng.game.data.oldsystem.sink.ads_sink;
- import com.ververica.cdc.connectors.mysql.source.MySqlSource;
- import com.ververica.cdc.connectors.mysql.table.StartupOptions;
- import org.apache.commons.lang3.StringUtils;
- import org.apache.flink.api.common.eventtime.WatermarkStrategy;
- import org.apache.flink.api.common.functions.MapFunction;
- import org.apache.flink.api.common.restartstrategy.RestartStrategies;
- import org.apache.flink.api.common.time.Time;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
- import org.apache.flink.streaming.api.CheckpointingMode;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.environment.CheckpointConfig;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import java.math.BigDecimal;
- import java.math.RoundingMode;
- import java.util.Arrays;
- import java.util.List;
- import java.util.Properties;
- import java.util.concurrent.TimeUnit;
- /**
- * ODS层数据填充
- * 过滤字段及转换字段
- */
- public class ByteDailyCost {
- private static final FlinkAppConfigParam appConfigParam = Env.isTest ? FlinkAppConfigParam.builder()
- .savePath("")
- .interval(300L)
- .timeout(300L)
- .minBetween(1L)
- .build() : FlinkAppConfigParam.builder()
- .savePath("hdfs://nameservice1:8020/user/limeng/cluster_yarn/checkpoints/ODS/" + ByteDailyCost.class.getSimpleName())
- .interval(300L)
- .timeout(300L)
- .minBetween(1L)
- .build();
- public static void main(String[] args) throws Exception {
- System.setProperty("HADOOP_USER_NAME", "flink");
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- // 加载配置文件到 flink的全局配置中
- Properties props = new Properties();
- props.load(ByteDailyCost.class.getResourceAsStream(Env.isTest ? "/application.test.properties" : "/application.properties"));
- Configuration configuration = new Configuration();
- props.stringPropertyNames().forEach(key -> {
- String value = props.getProperty(key);
- configuration.setString(key.trim(), StringUtils.isBlank(value) ? "" : value.trim());
- });
- env.getConfig().setGlobalJobParameters(configuration);
- // 设置默认并行度
- env.setParallelism(1);
- // 任务失败后的重启策略
- // env.setRestartStrategy(RestartStrategies.noRestart());// 失败不重启
- // env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));// 3:最大重试次数、10:重启间隔时间
- env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.of(5, TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)));// 5分钟内最多重启 3次,重启间隔时间:10s
- // checkpoint配置
- env.enableCheckpointing(appConfigParam.getInterval() * 1000, CheckpointingMode.EXACTLY_ONCE);
- // checkpoint执行超时时间,超时则 checkpoint失败
- env.getCheckpointConfig().setCheckpointTimeout(appConfigParam.getTimeout() * 1000);
- // checkpoint执行最小间隔时间
- env.getCheckpointConfig().setMinPauseBetweenCheckpoints(appConfigParam.getMinBetween() * 1000);
- // 允许并行执行 checkpoint个数,当指定了 minPauseBetweenCheckpoints时,其值无效(就是 1)
- env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
- // 开启 checkpoints的外部持久化,但是在job失败的时候不会自动清理,需要自己手工清理state。
- // ExternalizedCheckpointCleanup用于指定当job canceled的时候外部的 checkpoint该如何清理
- // DELETE_ON_CANCELLATION: 在job canceled的时候会自动删除外部 state,但是如果是FAILED的状态则会保留
- // RETAIN_ON_CANCELLATION:在job canceled的时候保留外部 state
- env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
- // 设置可容忍的检查点失败数,默认值为0表示不允许容忍任何检查点失败
- env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
- // 设置状态后端
- env.setStateBackend(new HashMapStateBackend());
- // 设置检查点目录
- if (StringUtils.isNotBlank(appConfigParam.getSavePath())) {
- env.getCheckpointConfig().setCheckpointStorage(appConfigParam.getSavePath());
- }
- MySqlSource<String> mysqlCDCSource = MySqlSource.<String>builder()
- .hostname(props.getProperty("cdc.mysql.backup.host"))
- .port(StringUtils.isBlank(props.getProperty("cdc.mysql.backup.port")) ? 3306 : Integer.parseInt(props.getProperty("cdc.mysql.backup.port")))
- .username(props.getProperty("cdc.mysql.backup.username"))
- .password(props.getProperty("cdc.mysql.backup.password"))
- .databaseList("ods")
- .tableList((StringUtils.join(new String[]{
- "ods.byte_t_ad_data_day"
- }, ",")))
- .deserializer(new CustomerDeserializationSchema())
- //5400 和 6400
- .startupOptions(StartupOptions.initial())
- .build();
- DataStreamSource<String> dataStreamSource = env.fromSource(mysqlCDCSource, WatermarkStrategy.noWatermarks(), "ODS_Mysql_Source");
- dataStreamSource.map(new FieldFilterMap()).addSink(new ads_sink());
- //4.启动任务
- env.execute("ByteDailyCost");
- }
- public static class FieldFilterMap implements MapFunction<String, JSONObject> {
- @Override
- public JSONObject map(String value) throws Exception {
- JSONObject result = new JSONObject();
- JSONObject data = JSON.parseObject(value);
- String tableName = data.getString("tableName");
- JSONObject after = data.getJSONObject("after");
- JSONObject key = data.getJSONObject("key");
- String afterFinal = "";
- String keyFinal = "";
- String tableNameFinal = "";
- if (tableName.equals("byte_t_ad_data_day")) {
- List<String> columns = Arrays.asList("account_id","ad_id", "day",
- "cost", "show_count", "click", "ctr","active");
- after.entrySet().removeIf(next -> !columns.contains(next.getKey()));
- BigDecimal cost = after.getBigDecimal("cost");
- // BigDecimal divide = cost.divide(new BigDecimal(100), 6, RoundingMode.HALF_UP);
- BigDecimal multiply = cost.multiply(new BigDecimal(100));
- after.remove("cost");
- after.put("cost",multiply);
- keyFinal = key.toString().replaceAll("\"day\":", "\"date\":");
- afterFinal = after.toString().replaceAll("\"day\":", "\"date\":")
- .replaceAll("\"show_count\":", "\"view_count\":")
- .replaceAll("\"click\":", "\"valid_click_count\":")
- .replaceAll("\"active\":", "\"from_follow_uv\":");
- tableNameFinal = "daily_tt";
- }
- Object afterFinalJson = JSONObject.parse(afterFinal);
- Object keyFinalJson = JSONObject.parse(keyFinal);
- result.put("after", afterFinalJson);
- result.put("key", keyFinalJson);
- result.put("type", data.getString("type"));
- result.put("tableName", tableNameFinal);
- result.put("db", "quchen_text");
- return result;
- }
- }
- }
|