|
@@ -1,8 +1,9 @@
|
|
package flink.zanxiangnet.ad.monitoring;
|
|
package flink.zanxiangnet.ad.monitoring;
|
|
|
|
|
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
-import flink.zanxiangnet.ad.monitoring.util.DateUtil;
|
|
|
|
-import flink.zanxiangnet.ad.monitoring.util.JsonUtil;
|
|
|
|
|
|
+import com.zanxiangnet.module.util.DateUtil;
|
|
|
|
+import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfMinuteDWD;
|
|
|
|
+import flink.zanxiangnet.ad.monitoring.stream.KeyedBatchStream;
|
|
import lombok.AllArgsConstructor;
|
|
import lombok.AllArgsConstructor;
|
|
import lombok.Builder;
|
|
import lombok.Builder;
|
|
import lombok.Data;
|
|
import lombok.Data;
|
|
@@ -11,13 +12,13 @@ import lombok.extern.slf4j.Slf4j;
|
|
import org.apache.commons.lang3.StringUtils;
|
|
import org.apache.commons.lang3.StringUtils;
|
|
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
|
|
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
|
|
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
|
|
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
|
|
-import org.apache.flink.api.common.state.ListState;
|
|
|
|
-import org.apache.flink.api.common.state.ListStateDescriptor;
|
|
|
|
|
|
+import org.apache.flink.api.common.state.*;
|
|
import org.apache.flink.api.common.typeinfo.Types;
|
|
import org.apache.flink.api.common.typeinfo.Types;
|
|
import org.apache.flink.configuration.Configuration;
|
|
import org.apache.flink.configuration.Configuration;
|
|
import org.apache.flink.streaming.api.datastream.DataStreamSource;
|
|
import org.apache.flink.streaming.api.datastream.DataStreamSource;
|
|
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
|
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
|
|
|
+import org.apache.flink.streaming.api.functions.ProcessFunction;
|
|
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
|
|
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
|
|
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
|
|
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
|
|
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
|
|
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
|
|
@@ -30,6 +31,7 @@ import org.apache.flink.util.Collector;
|
|
import java.time.Duration;
|
|
import java.time.Duration;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
|
|
+import java.util.Map;
|
|
import java.util.Properties;
|
|
import java.util.Properties;
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
@@ -41,14 +43,14 @@ public class Test {
|
|
public static void main(String[] args) throws Exception {
|
|
public static void main(String[] args) throws Exception {
|
|
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
|
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
|
// 加载配置文件到 flink的全局配置中
|
|
// 加载配置文件到 flink的全局配置中
|
|
- Properties props = new Properties();
|
|
|
|
|
|
+ /*Properties props = new Properties();
|
|
props.load(Test.class.getResourceAsStream("/application.properties"));
|
|
props.load(Test.class.getResourceAsStream("/application.properties"));
|
|
Configuration configuration = new Configuration();
|
|
Configuration configuration = new Configuration();
|
|
props.stringPropertyNames().forEach(key -> {
|
|
props.stringPropertyNames().forEach(key -> {
|
|
String value = props.getProperty(key);
|
|
String value = props.getProperty(key);
|
|
configuration.setString(key.trim(), StringUtils.isBlank(value) ? "" : value.trim());
|
|
configuration.setString(key.trim(), StringUtils.isBlank(value) ? "" : value.trim());
|
|
});
|
|
});
|
|
- env.getConfig().setGlobalJobParameters(configuration);
|
|
|
|
|
|
+ env.getConfig().setGlobalJobParameters(configuration);*/
|
|
|
|
|
|
env.setParallelism(1);
|
|
env.setParallelism(1);
|
|
|
|
|
|
@@ -57,76 +59,108 @@ public class Test {
|
|
SingleOutputStreamOperator<Pojo> pojoStream = source.assignTimestampsAndWatermarks(WatermarkStrategy.<Pojo>forBoundedOutOfOrderness(Duration.ofHours(3))
|
|
SingleOutputStreamOperator<Pojo> pojoStream = source.assignTimestampsAndWatermarks(WatermarkStrategy.<Pojo>forBoundedOutOfOrderness(Duration.ofHours(3))
|
|
.withTimestampAssigner((SerializableTimestampAssigner<Pojo>) (pojo, l) -> pojo.getCreateTime())
|
|
.withTimestampAssigner((SerializableTimestampAssigner<Pojo>) (pojo, l) -> pojo.getCreateTime())
|
|
);
|
|
);
|
|
- pojoStream.keyBy(Pojo::getUserId)
|
|
|
|
|
|
+ SingleOutputStreamOperator<Pojo> temp = pojoStream.keyBy(Pojo::getUserId).window(TumblingEventTimeWindows.of(Time.hours(24))).process(new ProcessWindowFunction<Pojo, Pojo, Integer, TimeWindow>() {
|
|
|
|
+ @Override
|
|
|
|
+ public void process(Integer integer, ProcessWindowFunction<Pojo, Pojo, Integer, TimeWindow>.Context context, Iterable<Pojo> elements, Collector<Pojo> out) throws Exception {
|
|
|
|
+ System.out.println("2222begin: " + DateUtil.formatLocalDateTime(DateUtil.milliToLocalDateTime(context.window().getStart())) + " | end: " + DateUtil.formatLocalDateTime(DateUtil.milliToLocalDateTime(context.window().getEnd())));
|
|
|
|
+ for(Pojo pojo : elements) {
|
|
|
|
+ out.collect(pojo);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+ new KeyedBatchStream<>(pojoStream, Pojo::getUserId, 10L, Time.seconds(10))
|
|
|
|
+ .toBatch()
|
|
|
|
+ .process(new ProcessFunction<List<Pojo>, String>() {
|
|
|
|
+ @Override
|
|
|
|
+ public void processElement(List<Pojo> value, ProcessFunction<List<Pojo>, String>.Context ctx, Collector<String> out) throws Exception {
|
|
|
|
+ out.collect("收到 " + value.size() + "个元素!!!");
|
|
|
|
+ }
|
|
|
|
+ }).print();
|
|
|
|
+ /*pojoStream.keyBy(Pojo::getUserId)
|
|
.window(TumblingEventTimeWindows.of(Time.days(1L), Time.hours(-8)))
|
|
.window(TumblingEventTimeWindows.of(Time.days(1L), Time.hours(-8)))
|
|
.trigger(new Trigger<Pojo, TimeWindow>() {
|
|
.trigger(new Trigger<Pojo, TimeWindow>() {
|
|
- /**
|
|
|
|
- *
|
|
|
|
- * @param pojo
|
|
|
|
- * @param time 触发窗口的时间,比如此例中是 eventTime
|
|
|
|
- * @param timeWindow
|
|
|
|
- * @param triggerContext
|
|
|
|
- * @return
|
|
|
|
- * @throws Exception
|
|
|
|
- */
|
|
|
|
|
|
+ *//**
|
|
|
|
+ *
|
|
|
|
+ * @param pojo
|
|
|
|
+ * @param time 触发窗口的时间,比如此例中是 eventTime
|
|
|
|
+ * @param timeWindow
|
|
|
|
+ * @param triggerContext
|
|
|
|
+ * @return
|
|
|
|
+ * @throws Exception
|
|
|
|
+ *//*
|
|
@Override
|
|
@Override
|
|
public TriggerResult onElement(Pojo pojo, long time, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
|
|
public TriggerResult onElement(Pojo pojo, long time, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
|
|
- /*log.error("trigger->onElement: {}, {}, {}, {}, [{} - {}]", JsonUtil.toString(pojo),
|
|
|
|
|
|
+ *//*log.error("trigger->onElement: {}, {}, {}, {}, [{} - {}]", JsonUtil.toString(pojo),
|
|
DateUtil.formatLocalDateTime(DateUtil.milliToLocalDateTime(timeWindow.getStart())),
|
|
DateUtil.formatLocalDateTime(DateUtil.milliToLocalDateTime(timeWindow.getStart())),
|
|
DateUtil.formatLocalDateTime(DateUtil.milliToLocalDateTime(timeWindow.getEnd())),
|
|
DateUtil.formatLocalDateTime(DateUtil.milliToLocalDateTime(timeWindow.getEnd())),
|
|
DateUtil.formatLocalDateTime(DateUtil.milliToLocalDateTime(triggerContext.getCurrentWatermark())),
|
|
DateUtil.formatLocalDateTime(DateUtil.milliToLocalDateTime(triggerContext.getCurrentWatermark())),
|
|
DateUtil.formatLocalDateTime(DateUtil.milliToLocalDateTime(timeWindow.maxTimestamp())),
|
|
DateUtil.formatLocalDateTime(DateUtil.milliToLocalDateTime(timeWindow.maxTimestamp())),
|
|
DateUtil.formatLocalDateTime(DateUtil.milliToLocalDateTime(triggerContext.getCurrentProcessingTime()))
|
|
DateUtil.formatLocalDateTime(DateUtil.milliToLocalDateTime(triggerContext.getCurrentProcessingTime()))
|
|
- );*/
|
|
|
|
|
|
+ );*//*
|
|
log.error("收到数据:{},eventTime:{}", DateUtil.milliToLocalDateTime(pojo.getCreateTime()), DateUtil.milliToLocalDateTime(time));
|
|
log.error("收到数据:{},eventTime:{}", DateUtil.milliToLocalDateTime(pojo.getCreateTime()), DateUtil.milliToLocalDateTime(time));
|
|
return TriggerResult.FIRE;
|
|
return TriggerResult.FIRE;
|
|
}
|
|
}
|
|
|
|
|
|
- /**
|
|
|
|
- * ProcessingTime定时器触发的时候调用
|
|
|
|
- *
|
|
|
|
- * @param time 调用这个方法的时间
|
|
|
|
- * @param timeWindow
|
|
|
|
- * @param triggerContext
|
|
|
|
- * @return
|
|
|
|
- * @throws Exception
|
|
|
|
- */
|
|
|
|
|
|
+ *//**
|
|
|
|
+ * ProcessingTime定时器触发的时候调用
|
|
|
|
+ *
|
|
|
|
+ * @param time 调用这个方法的时间
|
|
|
|
+ * @param timeWindow
|
|
|
|
+ * @param triggerContext
|
|
|
|
+ * @return
|
|
|
|
+ * @throws Exception
|
|
|
|
+ *//*
|
|
@Override
|
|
@Override
|
|
public TriggerResult onProcessingTime(long time, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
|
|
public TriggerResult onProcessingTime(long time, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
|
|
log.error("trigger->onProcessingTime: {}", DateUtil.formatLocalDateTime(DateUtil.milliToLocalDateTime(time)));
|
|
log.error("trigger->onProcessingTime: {}", DateUtil.formatLocalDateTime(DateUtil.milliToLocalDateTime(time)));
|
|
return TriggerResult.CONTINUE;
|
|
return TriggerResult.CONTINUE;
|
|
}
|
|
}
|
|
|
|
|
|
- /**
|
|
|
|
- * EventTime定时器触发的时候调用
|
|
|
|
- *
|
|
|
|
- * @param time eventTime
|
|
|
|
- * @param timeWindow
|
|
|
|
- * @param triggerContext
|
|
|
|
- * @return
|
|
|
|
- * @throws Exception
|
|
|
|
- */
|
|
|
|
|
|
+ *//**
|
|
|
|
+ * EventTime定时器触发的时候调用
|
|
|
|
+ *
|
|
|
|
+ * @param time eventTime
|
|
|
|
+ * @param timeWindow
|
|
|
|
+ * @param triggerContext
|
|
|
|
+ * @return
|
|
|
|
+ * @throws Exception
|
|
|
|
+ *//*
|
|
@Override
|
|
@Override
|
|
public TriggerResult onEventTime(long time, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
|
|
public TriggerResult onEventTime(long time, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
|
|
log.error("trigger->onEventTime: {}", DateUtil.formatLocalDateTime(DateUtil.milliToLocalDateTime(time)));
|
|
log.error("trigger->onEventTime: {}", DateUtil.formatLocalDateTime(DateUtil.milliToLocalDateTime(time)));
|
|
return TriggerResult.PURGE;
|
|
return TriggerResult.PURGE;
|
|
}
|
|
}
|
|
|
|
|
|
- /**
|
|
|
|
- * 窗口处理结束后,清除窗口的时候调用
|
|
|
|
- *
|
|
|
|
- * @param timeWindow
|
|
|
|
- * @param triggerContext
|
|
|
|
- * @throws Exception
|
|
|
|
- */
|
|
|
|
|
|
+ *//**
|
|
|
|
+ * 窗口处理结束后,清除窗口的时候调用
|
|
|
|
+ *
|
|
|
|
+ * @param timeWindow
|
|
|
|
+ * @param triggerContext
|
|
|
|
+ * @throws Exception
|
|
|
|
+ *//*
|
|
@Override
|
|
@Override
|
|
public void clear(TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
|
|
public void clear(TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
|
|
log.error("trigger->clear");
|
|
log.error("trigger->clear");
|
|
}
|
|
}
|
|
}).process(new ProcessWindowFunction<Pojo, Pojo, Integer, TimeWindow>() {
|
|
}).process(new ProcessWindowFunction<Pojo, Pojo, Integer, TimeWindow>() {
|
|
|
|
|
|
|
|
+ private ValueState<String> valueState;
|
|
|
|
+ private ListState<String> listState;
|
|
|
|
+ private MapState<String, String> mapState;
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
public void open(Configuration conf) {
|
|
public void open(Configuration conf) {
|
|
|
|
+ StateTtlConfig ttlConfig =
|
|
|
|
+ StateTtlConfig.newBuilder(org.apache.flink.api.common.time.Time.days(1)) //它是生存时间值
|
|
|
|
+ .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
|
|
|
|
+ .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
|
|
|
|
+ .cleanupFullSnapshot() // 在快照的时候进行删除
|
|
|
|
+ .build();
|
|
|
|
+ ValueStateDescriptor<String> valueStateDescriptor = new ValueStateDescriptor<>("valueState", Types.STRING);
|
|
|
|
+ valueStateDescriptor.enableTimeToLive(ttlConfig);
|
|
|
|
+ valueState = getRuntimeContext().getState(new ValueStateDescriptor<>("valueState", Types.STRING));
|
|
|
|
+ listState = getRuntimeContext().getListState(new ListStateDescriptor<>("listState", Types.STRING));
|
|
|
|
+ mapState = getRuntimeContext().getMapState(new MapStateDescriptor<>("mapState", Types.STRING, Types.STRING));
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -137,7 +171,7 @@ public class Test {
|
|
collector.collect(pojo);
|
|
collector.collect(pojo);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- });//.print();
|
|
|
|
|
|
+ });*/
|
|
/*.aggregate(new AggregateFunction<Pojo, Tuple5<Integer, Long, Long, Integer, List<Long>>, String>() {
|
|
/*.aggregate(new AggregateFunction<Pojo, Tuple5<Integer, Long, Long, Integer, List<Long>>, String>() {
|
|
|
|
|
|
@Override
|
|
@Override
|