package com.qucheng.game.data.oldsystem.ods; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.qucheng.game.data.oldsystem.Env; import com.qucheng.game.data.oldsystem.pojo.FlinkAppConfigParam; import com.qucheng.game.data.oldsystem.serialization.CustomerDeserializationSchema; import com.qucheng.game.data.oldsystem.sink.ads_sink; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.math.BigDecimal; import java.math.RoundingMode; import java.util.Arrays; import java.util.List; import java.util.Properties; import java.util.concurrent.TimeUnit; /** * ODS层数据填充 * 过滤字段及转换字段 */ public class ByteDailyCost { private static final FlinkAppConfigParam appConfigParam = Env.isTest ? FlinkAppConfigParam.builder() .savePath("") .interval(300L) .timeout(300L) .minBetween(1L) .build() : FlinkAppConfigParam.builder() .savePath("hdfs://nameservice1:8020/user/limeng/cluster_yarn/checkpoints/ODS/" + ByteDailyCost.class.getSimpleName()) .interval(300L) .timeout(300L) .minBetween(1L) .build(); public static void main(String[] args) throws Exception { System.setProperty("HADOOP_USER_NAME", "flink"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 加载配置文件到 flink的全局配置中 Properties props = new Properties(); props.load(ByteDailyCost.class.getResourceAsStream(Env.isTest ? "/application.test.properties" : "/application.properties")); Configuration configuration = new Configuration(); props.stringPropertyNames().forEach(key -> { String value = props.getProperty(key); configuration.setString(key.trim(), StringUtils.isBlank(value) ? "" : value.trim()); }); env.getConfig().setGlobalJobParameters(configuration); // 设置默认并行度 env.setParallelism(1); // 任务失败后的重启策略 // env.setRestartStrategy(RestartStrategies.noRestart());// 失败不重启 // env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));// 3:最大重试次数、10:重启间隔时间 env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.of(5, TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)));// 5分钟内最多重启 3次,重启间隔时间:10s // checkpoint配置 env.enableCheckpointing(appConfigParam.getInterval() * 1000, CheckpointingMode.EXACTLY_ONCE); // checkpoint执行超时时间,超时则 checkpoint失败 env.getCheckpointConfig().setCheckpointTimeout(appConfigParam.getTimeout() * 1000); // checkpoint执行最小间隔时间 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(appConfigParam.getMinBetween() * 1000); // 允许并行执行 checkpoint个数,当指定了 minPauseBetweenCheckpoints时,其值无效(就是 1) env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 开启 checkpoints的外部持久化,但是在job失败的时候不会自动清理,需要自己手工清理state。 // ExternalizedCheckpointCleanup用于指定当job canceled的时候外部的 checkpoint该如何清理 // DELETE_ON_CANCELLATION: 在job canceled的时候会自动删除外部 state,但是如果是FAILED的状态则会保留 // RETAIN_ON_CANCELLATION:在job canceled的时候保留外部 state env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 设置可容忍的检查点失败数,默认值为0表示不允许容忍任何检查点失败 env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2); // 设置状态后端 env.setStateBackend(new HashMapStateBackend()); // 设置检查点目录 if (StringUtils.isNotBlank(appConfigParam.getSavePath())) { env.getCheckpointConfig().setCheckpointStorage(appConfigParam.getSavePath()); } MySqlSource mysqlCDCSource = MySqlSource.builder() .hostname(props.getProperty("cdc.mysql.backup.host")) .port(StringUtils.isBlank(props.getProperty("cdc.mysql.backup.port")) ? 3306 : Integer.parseInt(props.getProperty("cdc.mysql.backup.port"))) .username(props.getProperty("cdc.mysql.backup.username")) .password(props.getProperty("cdc.mysql.backup.password")) .databaseList("ods") .tableList((StringUtils.join(new String[]{ "ods.byte_t_ad_data_day" }, ","))) .deserializer(new CustomerDeserializationSchema()) //5400 和 6400 .startupOptions(StartupOptions.initial()) .build(); DataStreamSource dataStreamSource = env.fromSource(mysqlCDCSource, WatermarkStrategy.noWatermarks(), "ODS_Mysql_Source"); dataStreamSource.map(new FieldFilterMap()).addSink(new ads_sink()); //4.启动任务 env.execute("ByteDailyCost"); } public static class FieldFilterMap implements MapFunction { @Override public JSONObject map(String value) throws Exception { JSONObject result = new JSONObject(); JSONObject data = JSON.parseObject(value); String tableName = data.getString("tableName"); JSONObject after = data.getJSONObject("after"); JSONObject key = data.getJSONObject("key"); String afterFinal = ""; String keyFinal = ""; String tableNameFinal = ""; if (tableName.equals("byte_t_ad_data_day")) { List columns = Arrays.asList("account_id","ad_id", "day", "cost", "show_count", "click", "ctr","active"); after.entrySet().removeIf(next -> !columns.contains(next.getKey())); BigDecimal cost = after.getBigDecimal("cost"); // BigDecimal divide = cost.divide(new BigDecimal(100), 6, RoundingMode.HALF_UP); BigDecimal multiply = cost.multiply(new BigDecimal(100)); after.remove("cost"); after.put("cost",multiply); keyFinal = key.toString().replaceAll("\"day\":", "\"date\":"); afterFinal = after.toString().replaceAll("\"day\":", "\"date\":") .replaceAll("\"show_count\":", "\"view_count\":") .replaceAll("\"click\":", "\"valid_click_count\":") .replaceAll("\"active\":", "\"from_follow_uv\":"); tableNameFinal = "daily_tt"; } Object afterFinalJson = JSONObject.parse(afterFinal); Object keyFinalJson = JSONObject.parse(keyFinal); result.put("after", afterFinalJson); result.put("key", keyFinalJson); result.put("type", data.getString("type")); result.put("tableName", tableNameFinal); result.put("db", "quchen_text"); return result; } } }