|
@@ -1,112 +1,105 @@
|
|
package flink.zanxiangnet.ad.monitoring;
|
|
package flink.zanxiangnet.ad.monitoring;
|
|
|
|
|
|
-import com.aliyun.odps.Instance;
|
|
|
|
-import com.aliyun.odps.Odps;
|
|
|
|
-import com.aliyun.odps.account.Account;
|
|
|
|
-import com.aliyun.odps.account.AliyunAccount;
|
|
|
|
-import com.aliyun.odps.data.Record;
|
|
|
|
-import com.aliyun.odps.task.SQLTask;
|
|
|
|
|
|
+import com.tencent.ads.model.DailyReportsGetListStruct;
|
|
import com.tencent.ads.model.HourlyReportsGetListStruct;
|
|
import com.tencent.ads.model.HourlyReportsGetListStruct;
|
|
-import flink.zanxiangnet.ad.monitoring.maxcompute.sink.MaxComputeSink;
|
|
|
|
-import flink.zanxiangnet.ad.monitoring.pojo.AdInfo;
|
|
|
|
-import flink.zanxiangnet.ad.monitoring.pojo.PlanInfo;
|
|
|
|
-import flink.zanxiangnet.ad.monitoring.pojo.StatInfo;
|
|
|
|
|
|
+import flink.zanxiangnet.ad.monitoring.maxcompute.sink.TunnelBatchSink;
|
|
|
|
+import flink.zanxiangnet.ad.monitoring.maxcompute.sink.TunnelStreamSink;
|
|
|
|
+import flink.zanxiangnet.ad.monitoring.pojo.dto.AdDataOfDayDTO;
|
|
|
|
+import flink.zanxiangnet.ad.monitoring.pojo.entity.AdDataOfDayODS;
|
|
|
|
+import flink.zanxiangnet.ad.monitoring.pojo.properties.KafkaProperties;
|
|
import flink.zanxiangnet.ad.monitoring.pojo.dto.AdDataOfMinuteDTO;
|
|
import flink.zanxiangnet.ad.monitoring.pojo.dto.AdDataOfMinuteDTO;
|
|
import flink.zanxiangnet.ad.monitoring.pojo.entity.AdDataOfMinuteODS;
|
|
import flink.zanxiangnet.ad.monitoring.pojo.entity.AdDataOfMinuteODS;
|
|
-import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfHourDWD;
|
|
|
|
import flink.zanxiangnet.ad.monitoring.util.DateUtil;
|
|
import flink.zanxiangnet.ad.monitoring.util.DateUtil;
|
|
import flink.zanxiangnet.ad.monitoring.util.JsonUtil;
|
|
import flink.zanxiangnet.ad.monitoring.util.JsonUtil;
|
|
-import flink.zanxiangnet.ad.monitoring.util.NumberUtil;
|
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
import org.apache.commons.lang3.StringUtils;
|
|
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
|
|
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
|
|
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
|
|
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
|
|
-import org.apache.flink.api.common.functions.FilterFunction;
|
|
|
|
-import org.apache.flink.api.common.functions.ReduceFunction;
|
|
|
|
-import org.apache.flink.api.common.functions.RichMapFunction;
|
|
|
|
-import org.apache.flink.api.common.functions.RichReduceFunction;
|
|
|
|
import org.apache.flink.api.common.serialization.SimpleStringSchema;
|
|
import org.apache.flink.api.common.serialization.SimpleStringSchema;
|
|
-import org.apache.flink.api.java.functions.KeySelector;
|
|
|
|
import org.apache.flink.configuration.Configuration;
|
|
import org.apache.flink.configuration.Configuration;
|
|
import org.apache.flink.connector.kafka.source.KafkaSource;
|
|
import org.apache.flink.connector.kafka.source.KafkaSource;
|
|
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
|
|
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
|
|
import org.apache.flink.streaming.api.datastream.DataStreamSource;
|
|
import org.apache.flink.streaming.api.datastream.DataStreamSource;
|
|
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
|
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
|
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
|
|
|
|
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
|
|
|
|
-import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
|
|
|
|
-import org.apache.flink.streaming.api.windowing.time.Time;
|
|
|
|
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
|
|
|
|
-import org.apache.flink.util.Collector;
|
|
|
|
import org.apache.kafka.clients.CommonClientConfigs;
|
|
import org.apache.kafka.clients.CommonClientConfigs;
|
|
|
|
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
|
|
import org.apache.kafka.common.config.SaslConfigs;
|
|
import org.apache.kafka.common.config.SaslConfigs;
|
|
import org.apache.kafka.common.config.SslConfigs;
|
|
import org.apache.kafka.common.config.SslConfigs;
|
|
-import org.slf4j.Logger;
|
|
|
|
-import org.slf4j.LoggerFactory;
|
|
|
|
import org.springframework.beans.BeanUtils;
|
|
import org.springframework.beans.BeanUtils;
|
|
|
|
|
|
import java.time.Duration;
|
|
import java.time.Duration;
|
|
import java.time.LocalDateTime;
|
|
import java.time.LocalDateTime;
|
|
|
|
+import java.util.Date;
|
|
import java.util.Properties;
|
|
import java.util.Properties;
|
|
|
|
|
|
public class KafkaDemoJob {
|
|
public class KafkaDemoJob {
|
|
|
|
|
|
public static void main(String[] args) throws Exception {
|
|
public static void main(String[] args) throws Exception {
|
|
System.setProperty("javax.net.ssl.trustStore", "D:\\Downloads\\kafka.client.truststore.jks");
|
|
System.setProperty("javax.net.ssl.trustStore", "D:\\Downloads\\kafka.client.truststore.jks");
|
|
|
|
+ // System.setProperty("javax.net.ssl.trustStore", "/root/flink-1.13.2/kafka.client.truststore.jks");
|
|
System.setProperty("javax.net.ssl.trustStorePassword", "KafkaOnsClient");
|
|
System.setProperty("javax.net.ssl.trustStorePassword", "KafkaOnsClient");
|
|
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
|
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
|
|
|
|
|
|
|
+ // 加载配置文件到 flink的全局配置中
|
|
Properties props = new Properties();
|
|
Properties props = new Properties();
|
|
- // props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"alikafka_pre-cn-tl32fsx4l00x\" password=\"VOEdhZLjOrL76lrl5bqPtydtoEkbs0Ny\";");
|
|
|
|
- props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"alikafka_pre-cn-tl32fsx4l00x\" password=\"VOEdhZLjOrL76lrl5bqPtydtoEkbs0Ny\";");
|
|
|
|
- props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "D:\\Downloads\\kafka.client.truststore.jks");
|
|
|
|
- props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
|
|
|
|
- props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
|
|
|
|
- props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
|
|
|
|
- props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
|
|
|
|
- KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
|
|
|
|
- .setBootstrapServers("114.55.59.94:9093,112.124.33.132:9093")
|
|
|
|
- .setTopics("ad_cost_topic")
|
|
|
|
- .setGroupId("ad_cost_group")
|
|
|
|
- .setProperties(props)
|
|
|
|
- .setStartingOffsets(OffsetsInitializer.earliest())
|
|
|
|
- .setValueOnlyDeserializer(new SimpleStringSchema())
|
|
|
|
- .build();
|
|
|
|
|
|
+ props.load(KafkaDemoJob.class.getResourceAsStream("/application.properties"));
|
|
|
|
+ Configuration configuration = new Configuration();
|
|
|
|
+ props.stringPropertyNames().forEach(key -> {
|
|
|
|
+ String value = props.getProperty(key);
|
|
|
|
+ configuration.setString(key.trim(), StringUtils.isBlank(value) ? "" : value.trim());
|
|
|
|
+ });
|
|
|
|
+ env.getConfig().setGlobalJobParameters(configuration);
|
|
|
|
|
|
- /*DataStreamSource<String> in = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");
|
|
|
|
|
|
+ Properties adStreamOfMinuteProps = new Properties();
|
|
|
|
+ adStreamOfMinuteProps.load(KafkaDemoJob.class.getResourceAsStream("/ad_stream_of_minute.properties"));
|
|
|
|
+ KafkaSource<String> adStreamOfMinuteSource = buildKafkaSource(adStreamOfMinuteProps);
|
|
|
|
|
|
- SingleOutputStreamOperator<String> adInfo = in.filter(StringUtils::isNotBlank).map(str -> {
|
|
|
|
- File file = new File("C:\\Users\\hi\\Desktop\\temp\\ff.txt");
|
|
|
|
- if (!file.exists()) {
|
|
|
|
- file.createNewFile();
|
|
|
|
- }
|
|
|
|
- FileWriter writer = new FileWriter(file, true);
|
|
|
|
- writer.write(str + "\r\n");
|
|
|
|
- writer.close();
|
|
|
|
- return str;
|
|
|
|
- });*/
|
|
|
|
|
|
+ DataStreamSource<String> adStreamOfMinuteIn = env.fromSource(adStreamOfMinuteSource, WatermarkStrategy.noWatermarks(), "adStreamOfMinuteSource_kafka");
|
|
|
|
|
|
- DataStreamSource<String> in = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "adDataOfMinuteSource_kafka");
|
|
|
|
-
|
|
|
|
- SingleOutputStreamOperator<AdDataOfMinuteODS> adOdsStream = in.filter(StringUtils::isNotBlank)
|
|
|
|
|
|
+ SingleOutputStreamOperator<AdDataOfMinuteODS> adMinuteOdsStream = adStreamOfMinuteIn.filter(StringUtils::isNotBlank)
|
|
.map(str -> {
|
|
.map(str -> {
|
|
AdDataOfMinuteDTO dto = JsonUtil.toObj(str, AdDataOfMinuteDTO.class);
|
|
AdDataOfMinuteDTO dto = JsonUtil.toObj(str, AdDataOfMinuteDTO.class);
|
|
LocalDateTime statTime = DateUtil.milliToLocalDateTime(dto.getCreateTime());
|
|
LocalDateTime statTime = DateUtil.milliToLocalDateTime(dto.getCreateTime());
|
|
|
|
+ HourlyReportsGetListStruct struct = dto.getHourlyReportsGetListStruct();
|
|
AdDataOfMinuteODS adOds = new AdDataOfMinuteODS();
|
|
AdDataOfMinuteODS adOds = new AdDataOfMinuteODS();
|
|
|
|
+ BeanUtils.copyProperties(struct, adOds);
|
|
adOds.setStatDay(DateUtil.formatLocalDate(statTime.toLocalDate()));
|
|
adOds.setStatDay(DateUtil.formatLocalDate(statTime.toLocalDate()));
|
|
adOds.setHour(dto.getHourlyReportsGetListStruct().getHour().intValue());
|
|
adOds.setHour(dto.getHourlyReportsGetListStruct().getHour().intValue());
|
|
adOds.setStatTime(dto.getCreateTime());
|
|
adOds.setStatTime(dto.getCreateTime());
|
|
adOds.setAccountId(dto.getAccountId());
|
|
adOds.setAccountId(dto.getAccountId());
|
|
adOds.setAgencyAccountId(dto.getHourlyReportsGetListStruct().getAccountId());
|
|
adOds.setAgencyAccountId(dto.getHourlyReportsGetListStruct().getAccountId());
|
|
- BeanUtils.copyProperties(dto.getHourlyReportsGetListStruct(), adOds);
|
|
|
|
return adOds;
|
|
return adOds;
|
|
})
|
|
})
|
|
// 打水印,延迟 12分钟,同时指定时间流
|
|
// 打水印,延迟 12分钟,同时指定时间流
|
|
.assignTimestampsAndWatermarks(WatermarkStrategy.<AdDataOfMinuteODS>forBoundedOutOfOrderness(Duration.ofMinutes(12L))
|
|
.assignTimestampsAndWatermarks(WatermarkStrategy.<AdDataOfMinuteODS>forBoundedOutOfOrderness(Duration.ofMinutes(12L))
|
|
.withTimestampAssigner((SerializableTimestampAssigner<AdDataOfMinuteODS>) (adOds, l) -> adOds.getStatTime())
|
|
.withTimestampAssigner((SerializableTimestampAssigner<AdDataOfMinuteODS>) (adOds, l) -> adOds.getStatTime())
|
|
);
|
|
);
|
|
|
|
+ adMinuteOdsStream.addSink(new TunnelBatchSink<>(AdDataOfMinuteODS.class, 36000L, 64000L, 3));
|
|
|
|
|
|
- adOdsStream.addSink(new MaxComputeSink<>(AdDataOfMinuteODS.class, "statDay"));
|
|
|
|
|
|
+ Properties adStreamOfDayProps = new Properties();
|
|
|
|
+ adStreamOfDayProps.load(KafkaDemoJob.class.getResourceAsStream("/ad_stream_of_day.properties"));
|
|
|
|
+ KafkaSource<String> adStreamOfDaySource = buildKafkaSource(adStreamOfDayProps);
|
|
|
|
+
|
|
|
|
+ DataStreamSource<String> adStreamOfDayIn = env.fromSource(adStreamOfDaySource, WatermarkStrategy.noWatermarks(), "adStreamOfMinuteSource_kafka");
|
|
|
|
+
|
|
|
|
+ SingleOutputStreamOperator<AdDataOfDayODS> adDayOdsStream = adStreamOfDayIn.filter(StringUtils::isNotBlank)
|
|
|
|
+ .map(str -> {
|
|
|
|
+ AdDataOfDayDTO dto = JsonUtil.toObj(str, AdDataOfDayDTO.class);
|
|
|
|
+ Date createTime = new Date(dto.getCreateTime());
|
|
|
|
+ DailyReportsGetListStruct struct = dto.getDailyReportsGetListStruct();
|
|
|
|
+ AdDataOfDayODS adOds = new AdDataOfDayODS();
|
|
|
|
+ BeanUtils.copyProperties(struct, adOds);
|
|
|
|
+ adOds.setStatDay(struct.getDate());
|
|
|
|
+ adOds.setAccountId(dto.getAccountId());
|
|
|
|
+ adOds.setCampaignId(struct.getCampaignId());
|
|
|
|
+ adOds.setAgencyAccountId(struct.getAccountId());
|
|
|
|
+ adOds.setWechatAccountId(struct.getWechatAccountId());
|
|
|
|
+ adOds.setAdgroupId(struct.getAdgroupId());
|
|
|
|
+ adOds.setAdId(struct.getAdId());
|
|
|
|
+ adOds.setCreateTime(createTime);
|
|
|
|
+ return adOds;
|
|
|
|
+ });
|
|
|
|
+ adDayOdsStream.addSink(new TunnelBatchSink<>(AdDataOfDayODS.class, 36000L, 6000L, 200));
|
|
|
|
|
|
// 获取指定广告的历史统计信息(用于统计总的消耗信息等)
|
|
// 获取指定广告的历史统计信息(用于统计总的消耗信息等)
|
|
/*SingleOutputStreamOperator<AdStatOfHourDWD> oldAdStream = adOdsStream.map(new RichMapFunction<AdDataOfMinuteODS, AdStatOfHourDWD>() {
|
|
/*SingleOutputStreamOperator<AdStatOfHourDWD> oldAdStream = adOdsStream.map(new RichMapFunction<AdDataOfMinuteODS, AdStatOfHourDWD>() {
|
|
@@ -119,7 +112,7 @@ public class KafkaDemoJob {
|
|
// odps.setEndpoint("http://service.odps.aliyun.com/api");
|
|
// odps.setEndpoint("http://service.odps.aliyun.com/api");
|
|
odps.setEndpoint("http://service.cn-hangzhou.maxcompute.aliyun.com/api");
|
|
odps.setEndpoint("http://service.cn-hangzhou.maxcompute.aliyun.com/api");
|
|
// http://dt.cn-hangzhou.maxcompute.aliyun.com
|
|
// http://dt.cn-hangzhou.maxcompute.aliyun.com
|
|
- odps.setDefaultProject("ZxData_dev");
|
|
|
|
|
|
+ odps.setDefaultProject("zx_ad_monitoring");
|
|
}
|
|
}
|
|
@Override
|
|
@Override
|
|
public AdStatOfHourDWD map(AdDataOfMinuteODS adDataOfMinuteODS) throws Exception {
|
|
public AdStatOfHourDWD map(AdDataOfMinuteODS adDataOfMinuteODS) throws Exception {
|
|
@@ -272,4 +265,32 @@ public class KafkaDemoJob {
|
|
// planStatStream.addSink(new PrintSink<>()).name("msg-print");
|
|
// planStatStream.addSink(new PrintSink<>()).name("msg-print");
|
|
env.execute();
|
|
env.execute();
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ private static KafkaSource<String> buildKafkaSource(Properties props) {
|
|
|
|
+ return buildKafkaSource(props.getProperty(KafkaProperties.KAFKA_SERVERS),
|
|
|
|
+ props.getProperty(KafkaProperties.KAFKA_USERNAME),
|
|
|
|
+ props.getProperty(KafkaProperties.KAFKA_PASSWORD),
|
|
|
|
+ props.getProperty(KafkaProperties.KAFKA_SSL_PATH),
|
|
|
|
+ props.getProperty(KafkaProperties.KAFKA_TOPIC),
|
|
|
|
+ props.getProperty(KafkaProperties.KAFKA_GROUP_ID)
|
|
|
|
+ );
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static KafkaSource<String> buildKafkaSource(String servers, String username, String password, String sslPath, String topic, String groupId) {
|
|
|
|
+ Properties props = new Properties();
|
|
|
|
+ props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";");
|
|
|
|
+ props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslPath);
|
|
|
|
+ props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
|
|
|
|
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
|
|
|
|
+ props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
|
|
|
|
+ props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
|
|
|
|
+ return KafkaSource.<String>builder()
|
|
|
|
+ .setBootstrapServers(servers)
|
|
|
|
+ .setTopics(topic)
|
|
|
|
+ .setGroupId(groupId)
|
|
|
|
+ .setProperties(props)
|
|
|
|
+ .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
|
|
|
|
+ .setValueOnlyDeserializer(new SimpleStringSchema())
|
|
|
|
+ .build();
|
|
|
|
+ }
|
|
}
|
|
}
|