123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236 |
- package com.qucheng.game.data.oldsystem.ods;
- import com.qucheng.game.data.oldsystem.Env;
- import com.qucheng.game.data.oldsystem.pojo.FlinkAppConfigParam;
- import com.qucheng.game.data.oldsystem.pojo.MysqlConfigParam;
- import com.qucheng.game.data.oldsystem.pojo.TransportMap;
- import com.qucheng.game.data.oldsystem.serialization.MapDebeziumDeserializationSchema;
- import com.qucheng.game.data.oldsystem.sink.CostSink;
- import com.qucheng.game.data.oldsystem.util.ObjectUtil;
- import com.ververica.cdc.connectors.mysql.source.MySqlSource;
- import com.ververica.cdc.connectors.mysql.table.StartupOptions;
- import io.debezium.data.Envelope;
- import org.apache.commons.lang3.StringUtils;
- import org.apache.flink.api.common.eventtime.WatermarkStrategy;
- import org.apache.flink.api.common.functions.RichFlatMapFunction;
- import org.apache.flink.api.common.restartstrategy.RestartStrategies;
- import org.apache.flink.api.common.time.Time;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
- import org.apache.flink.streaming.api.CheckpointingMode;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.CheckpointConfig;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.util.Collector;
- import java.math.BigDecimal;
- import java.time.LocalDateTime;
- import java.time.LocalTime;
- import java.util.*;
- import java.util.concurrent.TimeUnit;
- /**
- * ODS层数据填充
- * 过滤字段及转换字段
- */
- public class DailyCost {
- private static final FlinkAppConfigParam appConfigParam = Env.isTest ? FlinkAppConfigParam.builder()
- .savePath("")
- .interval(300L)
- .timeout(300L)
- .minBetween(1L)
- .build() : FlinkAppConfigParam.builder()
- .savePath("hdfs://nameservice1:8020/user/limeng/cluster_yarn/checkpoints/ODS/" + DailyCost.class.getSimpleName())
- .interval(300L)
- .timeout(300L)
- .minBetween(1L)
- .build();
- public static void main(String[] args) throws Exception {
- System.setProperty("HADOOP_USER_NAME", "flink");
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- // 加载配置文件到 flink的全局配置中
- Properties props = new Properties();
- props.load(DailyCost.class.getResourceAsStream(Env.isTest ? "/application.test.properties" : "/application.properties"));
- Configuration configuration = new Configuration();
- props.stringPropertyNames().forEach(key -> {
- String value = props.getProperty(key);
- configuration.setString(key.trim(), StringUtils.isBlank(value) ? "" : value.trim());
- });
- env.getConfig().setGlobalJobParameters(configuration);
- // 设置默认并行度
- env.setParallelism(1);
- // 任务失败后的重启策略
- // env.setRestartStrategy(RestartStrategies.noRestart());// 失败不重启
- // env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));// 3:最大重试次数、10:重启间隔时间
- env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.of(5, TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)));// 5分钟内最多重启 3次,重启间隔时间:10s
- // checkpoint配置
- env.enableCheckpointing(appConfigParam.getInterval() * 1000, CheckpointingMode.EXACTLY_ONCE);
- // checkpoint执行超时时间,超时则 checkpoint失败
- env.getCheckpointConfig().setCheckpointTimeout(appConfigParam.getTimeout() * 1000);
- // checkpoint执行最小间隔时间
- env.getCheckpointConfig().setMinPauseBetweenCheckpoints(appConfigParam.getMinBetween() * 1000);
- // 允许并行执行 checkpoint个数,当指定了 minPauseBetweenCheckpoints时,其值无效(就是 1)
- env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
- // 开启 checkpoints的外部持久化,但是在job失败的时候不会自动清理,需要自己手工清理state。
- // ExternalizedCheckpointCleanup用于指定当job canceled的时候外部的 checkpoint该如何清理
- // DELETE_ON_CANCELLATION: 在job canceled的时候会自动删除外部 state,但是如果是FAILED的状态则会保留
- // RETAIN_ON_CANCELLATION:在job canceled的时候保留外部 state
- env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
- // 设置可容忍的检查点失败数,默认值为0表示不允许容忍任何检查点失败
- env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
- // 设置状态后端
- env.setStateBackend(new HashMapStateBackend());
- // 设置检查点目录
- if (StringUtils.isNotBlank(appConfigParam.getSavePath())) {
- env.getCheckpointConfig().setCheckpointStorage(appConfigParam.getSavePath());
- }
- MysqlConfigParam qcMysqlConfig = MysqlConfigParam.builder()
- .url(props.getProperty("mysql.quChengText.url"))
- .username(props.getProperty("mysql.quChengText.username"))
- .password(props.getProperty("mysql.quChengText.password"))
- .build();
- MySqlSource<TransportMap> mysqlCDCSource = MySqlSource.<TransportMap>builder()
- .hostname(props.getProperty("cdc.mysql.backup.host"))
- .port(StringUtils.isBlank(props.getProperty("cdc.mysql.backup.port")) ? 3306 : Integer.parseInt(props.getProperty("cdc.mysql.backup.port")))
- .username(props.getProperty("cdc.mysql.backup.username"))
- .password(props.getProperty("cdc.mysql.backup.password"))
- .databaseList("ods")
- .tableList((StringUtils.join(new String[]{
- "ods.byte_t_ad_data_day",
- "ods.t_gdt_adgroups_data_day",
- }, ",")))
- .deserializer(new MapDebeziumDeserializationSchema())
- //5400 和 6400
- .startupOptions(StartupOptions.initial())
- .build();
- SingleOutputStreamOperator<TransportMap> out = env.fromSource(mysqlCDCSource, WatermarkStrategy.noWatermarks(), "ODS_Mysql_Source")
- .flatMap(new FieldFilterMap());
- out.addSink(new CostSink(qcMysqlConfig));
- env.execute(DailyCost.class.getSimpleName());
- }
- public static class FieldFilterMap extends RichFlatMapFunction<TransportMap, TransportMap> {
- private static final BigDecimal NUM_100 = new BigDecimal(100);
- @Override
- public void flatMap(TransportMap transportMap, Collector<TransportMap> out) throws Exception {
- String dbName = transportMap.getDbName();
- String tableName = transportMap.getTableName();
- List<String> primaryKeys = transportMap.getPrimaryKeys();
- Envelope.Operation operation = transportMap.getOperation();
- Map<String, Object> data = transportMap.getAfter();
- if (operation == Envelope.Operation.DELETE) {
- data = transportMap.getBefore();
- } else {
- data = transportMap.getAfter();
- }
- if (dbName.equals("ods")) {
- if (tableName.equals("byte_t_ad_data_day")) {
- oceanengineCost(operation, data, out);
- } else if (tableName.equals("t_mp_adgroups_data_day")) {
- tencentMpCost(operation, data, out);
- } else if (tableName.equals("t_gdt_adgroups_data_day")) {
- tencentGdtCost(operation, data, out);
- }
- }
- }
- private void oceanengineCost(Envelope.Operation operation, Map<String, Object> data, Collector<TransportMap> out) {
- Long accountId = ObjectUtil.objToLong(data.get("account_id"));
- Map<String, Object> result = new HashMap<>();
- result.put("account_id", accountId.toString());
- result.put("ad_id", ObjectUtil.objToLong(data.get("ad_id")));
- result.put("date", LocalDateTime.of(ObjectUtil.objToLocalDate(data.get("day")), LocalTime.MIDNIGHT));
- result.put("cost", ObjectUtil.objToBigDecimal(data.get("cost")).multiply(NUM_100).intValue());
- result.put("view_count", ObjectUtil.objToInteger(data.get("show_count")));
- result.put("valid_click_count", ObjectUtil.objToInteger(data.get("click")));
- result.put("ctr", ObjectUtil.objToBigDecimal(data.get("ctr")));
- result.put("from_follow_uv", ObjectUtil.objToInteger(data.get("active")));
- result.put("official_account_follow_rate", null);
- result.put("order_amount", null);
- result.put("order_roi", null);
- result.put("order_count", null);
- result.put("order_rate", null);
- result.put("order_unit_price", null);
- result.put("web_order_cost", null);
- result.put("first_day_order_amount", null);
- result.put("first_day_order_count", null);
- result.put("belong_version", ObjectUtil.objToInteger(data.get("belong_version")));
- out.collect(TransportMap.builder()
- .dbName("quchen_text")
- .tableName("daily_tt")
- .primaryKeys(Arrays.asList("account_id", "ad_id", "date"))
- .operation(operation)
- .after(result)
- .build());
- }
- private void tencentGdtCost(Envelope.Operation operation, Map<String, Object> data, Collector<TransportMap> out) {
- Long accountId = ObjectUtil.objToLong(data.get("account_id"));
- Map<String, Object> result = new HashMap<>();
- result.put("account_id", accountId.toString());
- result.put("adgroup_id", ObjectUtil.objToLong(data.get("adgroup_id")));
- result.put("date", LocalDateTime.of(ObjectUtil.objToLocalDate(data.get("day")), LocalTime.MIDNIGHT));
- result.put("view_count", ObjectUtil.objToInteger(data.get("view_count")));
- result.put("valid_click_count", ObjectUtil.objToInteger(data.get("valid_click_count")));
- result.put("ctr", ObjectUtil.objToBigDecimal(data.get("ctr")));
- result.put("cpc", ObjectUtil.objToInteger(data.get("cpc")));
- result.put("cost", ObjectUtil.objToInteger(data.get("cost")));
- result.put("web_order_count", null);
- result.put("web_order_rate", null);
- result.put("web_order_cost", null);
- result.put("follow_count", ObjectUtil.objToInteger(data.get("biz_follow_count")));
- result.put("order_amount", null);
- result.put("order_roi", null);
- result.put("platform_page_view_count", null);
- result.put("web_commodity_page_view_count", null);
- result.put("from_follow_uv", ObjectUtil.objToInteger(data.get("from_follow_uv")));
- out.collect(TransportMap.builder()
- .dbName("quchen_text")
- .tableName("daily_qq")
- .primaryKeys(Arrays.asList("account_id", "adgroup_id", "date"))
- .operation(operation)
- .after(result)
- .build());
- }
- private void tencentMpCost(Envelope.Operation operation, Map<String, Object> data, Collector<TransportMap> out) {
- Long accountId = ObjectUtil.objToLong(data.get("account_id"));
- Map<String, Object> result = new HashMap<>();
- result.put("account_id", accountId.toString());
- result.put("adgroup_id", ObjectUtil.objToLong(data.get("adgroup_id")));
- result.put("date", LocalDateTime.of(ObjectUtil.objToLocalDate(data.get("day")), LocalTime.MIDNIGHT));
- result.put("cost", ObjectUtil.objToInteger(data.get("cost")));
- result.put("view_count", ObjectUtil.objToInteger(data.get("view_count")));
- result.put("valid_click_count", ObjectUtil.objToInteger(data.get("valid_click_count")));
- result.put("ctr", ObjectUtil.objToBigDecimal(data.get("ctr")));
- result.put("official_account_follow_rate", ObjectUtil.objToBigDecimal(data.get("official_account_follow_rate")));
- result.put("order_amount", null);
- result.put("order_roi", null);
- result.put("order_count", null);
- result.put("order_rate", null);
- result.put("order_unit_price", null);
- result.put("web_order_cost", null);
- result.put("first_day_order_amount", null);
- result.put("first_day_order_count", null);
- out.collect(TransportMap.builder()
- .dbName("quchen_text")
- .tableName("daily_vx")
- .primaryKeys(Arrays.asList("account_id", "adgroup_id", "date"))
- .operation(operation)
- .after(result)
- .build());
- }
- }
- }
|