wcc 3 年 前
コミット
6975c59082

+ 64 - 154
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdStatJob.java

@@ -6,16 +6,13 @@ import com.aliyun.odps.account.Account;
 import com.aliyun.odps.account.AliyunAccount;
 import com.aliyun.odps.data.Record;
 import com.aliyun.odps.task.SQLTask;
-import com.tencent.ads.model.DailyReportsGetListStruct;
 import com.tencent.ads.model.HourlyReportsGetListStruct;
 import flink.zanxiangnet.ad.monitoring.maxcompute.MaxComputeLog;
-import flink.zanxiangnet.ad.monitoring.maxcompute.sink.TunnelBatchSink;
-import flink.zanxiangnet.ad.monitoring.pojo.dto.AdDataOfDayDTO;
-import flink.zanxiangnet.ad.monitoring.pojo.dto.AdStatOfDayODSDTO;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.*;
 import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
 import flink.zanxiangnet.ad.monitoring.pojo.properties.KafkaProperties;
 import flink.zanxiangnet.ad.monitoring.pojo.dto.AdDataOfMinuteDTO;
+import flink.zanxiangnet.ad.monitoring.process.AdMinuteDWDProcess;
 import flink.zanxiangnet.ad.monitoring.sink.TunnelBatchStreamSink;
 import flink.zanxiangnet.ad.monitoring.stream.KeyedBatchStream;
 import flink.zanxiangnet.ad.monitoring.util.DateUtil;
@@ -23,7 +20,6 @@ import flink.zanxiangnet.ad.monitoring.util.JsonUtil;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.eventtime.*;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.kafka.source.KafkaSource;
 import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
@@ -35,6 +31,8 @@ import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.util.Collector;
@@ -130,129 +128,63 @@ public class AdStatJob {
         DataStream<AdDataOfMinuteODS> adMinuteODSStream = adODSStream.getSideOutput(adMinuteStreamTag);
         // 写入原始表
         // adMinuteODSStream.addSink(new TunnelBatchSink<>(AdDataOfMinuteODS.class, 36000L, 64000L, 3));
-        new KeyedBatchStream<>("adMinuteODSStream", adMinuteODSStream.keyBy(AdDataOfMinuteODS::getStatDay), 4000L, 60 * 1000L)
-                .toBatch().addSink(new TunnelBatchStreamSink<>(AdDataOfMinuteODS.class));
+        new KeyedBatchStream<>("adMinuteODSStream", adMinuteODSStream.keyBy(AdDataOfMinuteODS::getStatDay), 4000L, 2 * 60 * 1000L)
+                .toBatch()
+                .addSink(new TunnelBatchStreamSink<>(AdDataOfMinuteODS.class));
 
-        adMinuteODSStream
+        SingleOutputStreamOperator<AdStatOfMinuteDWD> adMinuteDWDStream = adMinuteODSStream
                 // 打水印,允许数据延迟 6分钟,同时指定时间流
-                .assignTimestampsAndWatermarks(WatermarkStrategy.<AdDataOfMinuteODS>forBoundedOutOfOrderness(Duration.ofMinutes(6L))
+                .assignTimestampsAndWatermarks(WatermarkStrategy.<AdDataOfMinuteODS>forBoundedOutOfOrderness(Duration.ofMinutes(1L))
                         .withTimestampAssigner((SerializableTimestampAssigner<AdDataOfMinuteODS>) (adODS, l) -> adODS.getStatTime()))
                 .keyBy(AdDataOfMinuteODS::getAdId)
                 // 开一个 5分钟的滚动窗口
                 .window(TumblingEventTimeWindows.of(Time.minutes(5L)))
-                .process(new ProcessWindowFunction<AdDataOfMinuteODS, AdStatOfMinuteDWD, Long, TimeWindow>() {
-                    // 对象锁,防止MaxCompute的 Tunnel对象多次初始化
-                    private final String DUMMY_LOCK = "DUMMY_LOCK";
-                    private Odps odps;
-                    private long lastClearTime = System.currentTimeMillis();
-                    private final Map<Long, String> lastAdDayQueryDay = new ConcurrentHashMap<>();
-                    // 历史的天数据
-                    private final Map<Long, AdStatOfDayDWD> historyDayData = new ConcurrentHashMap<>();
-                    // 之前聚合的昨天的数据
-                    private final Map<Long, Map<String, AdStatOfMinuteDWD>> historyMinuteData = new ConcurrentHashMap<>();
-                    // 前 5分钟聚合的数据
-                    private final Map<Long, Map<String, AdStatOfMinuteDWD>> lastReduceData = new ConcurrentHashMap<>();
-
-                    private void clearData() {
-                        lastAdDayQueryDay.entrySet().removeIf(entry -> DateUtil.parseLocalDate(entry.getValue()).compareTo(LocalDate.now().minusDays(3L)) <= 0);
-                        historyDayData.entrySet().removeIf(entry -> DateUtil.parseLocalDate(entry.getValue().getStatDay()).compareTo(LocalDate.now().minusDays(50L)) <= 0);
-                        historyMinuteData.forEach((key, value) -> {
-                            if (value != null) {
-                                value.entrySet().removeIf(entry2 -> DateUtil.parseLocalDate(entry2.getKey()).compareTo(LocalDate.now().minusDays(3L)) <= 0);
-                            }
-                        });
-                        historyMinuteData.entrySet().removeIf(entry -> entry.getValue() == null || entry.getValue().isEmpty());
-
-                        lastReduceData.forEach((key, value) -> {
-                            if (value != null) {
-                                value.entrySet().removeIf(entry -> DateUtil.parseOfMinuteReduce(entry.getKey()).compareTo(LocalDateTime.now().minusHours(1)) <= 0);
-                            }
-                        });
-                        lastReduceData.entrySet().removeIf(entry -> entry.getValue() == null || entry.getValue().isEmpty());
+                .trigger(new Trigger<AdDataOfMinuteODS, TimeWindow>() {
+                    @Override
+                    public TriggerResult onElement(AdDataOfMinuteODS adDataOfMinuteODS, long time, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
+                        System.out.println("eventTime:" + DateUtil.formatLocalDateTime(DateUtil.milliToLocalDateTime(time)) +
+                                " | 水印时间:" + DateUtil.formatLocalDateTime(DateUtil.milliToLocalDateTime(triggerContext.getCurrentWatermark())));
+                        if (timeWindow.maxTimestamp() <= triggerContext.getCurrentWatermark()) {
+                            // 到了窗口的最大生命周期
+                            return TriggerResult.FIRE_AND_PURGE;
+                        }
+                        triggerContext.registerEventTimeTimer(timeWindow.maxTimestamp());
+                        if (adDataOfMinuteODS.getHour() == DateUtil.milliToLocalDateTime(time).getHour()) {
+                            return TriggerResult.FIRE;
+                        }
+                        return TriggerResult.CONTINUE;
                     }
 
                     @Override
-                    public void open(Configuration conf) {
-                        Map<String, String> params = getRuntimeContext()
-                                .getExecutionConfig()
-                                .getGlobalJobParameters()
-                                .toMap();
-                        Account account = new AliyunAccount(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_ID),
-                                params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_KEY));
-                        odps = new Odps(account);
-                        odps.getRestClient().setRetryLogger(new MaxComputeLog());
-                        odps.setEndpoint(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_ENDPOINT));
-                        odps.setDefaultProject(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_PROJECT_NAME));
+                    public TriggerResult onProcessingTime(long time, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
+                        return TriggerResult.CONTINUE;
                     }
 
                     @Override
-                    public void process(Long elementCount, ProcessWindowFunction<AdDataOfMinuteODS, AdStatOfMinuteDWD, Long, TimeWindow>.Context context,
-                                        Iterable<AdDataOfMinuteODS> iterable, Collector<AdStatOfMinuteDWD> collector) throws Exception {
-                        LocalDateTime beginTime = DateUtil.milliToLocalDateTime(context.window().getStart());
-                        String statDay = DateUtil.formatLocalDate(beginTime.toLocalDate());
-                        String reduceTimeKey = DateUtil.formatOfMinuteReduce(beginTime);
-                        int hour = beginTime.getHour();
-                        long now = System.currentTimeMillis();
-                        AdDataOfMinuteODS statODS = null;
-                        List<AdDataOfMinuteODS> adDataOfMinuteODSList = new ArrayList<>(24);
-                        for (AdDataOfMinuteODS adDataOfMinuteODS : iterable) {
-                            adDataOfMinuteODSList.add(adDataOfMinuteODS);
-                            if (adDataOfMinuteODS.getHour() != hour) {
-                                continue;
-                            }
-                            statODS = adDataOfMinuteODS;
-                            break;
-                        }
-                        if (statODS == null) {
-                            return;
-                        }
+                    public TriggerResult onEventTime(long time, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
+                        return time == timeWindow.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
+                    }
 
-                        // 每 12小时清理一次数据
-                        if (now - lastClearTime > 12 * 60 * 60 * 1000) {
-                            synchronized (DUMMY_LOCK) {
-                                if (now - lastClearTime > 12 * 60 * 60 * 1000) {
-                                    clearData();
-                                    lastClearTime = now;
-                                }
-                            }
-                        }
-                        Long adId = statODS.getAdId();
-
-                        // 昨天聚合的数据
-                        Map<String, AdStatOfMinuteDWD> historyMinuteMapping = historyMinuteData.computeIfAbsent(adId, key -> new HashMap<>());
-                        AdStatOfMinuteDWD yesterdayMinuteDWD = historyMinuteMapping.get(DateUtil.formatLocalDate(beginTime.minusDays(1L).toLocalDate()));
-                        // 之前的数据
-                        String lastQueryDay = lastAdDayQueryDay.get(adId);
-                        if (lastQueryDay == null || !lastQueryDay.equals(statDay)) {
-                            // 往前找 30天
-                            LocalDate endDay = beginTime.minusDays(2L).toLocalDate(), beginDay = endDay.minusDays(30L);
-                            Instance instance = SQLTask.run(odps, "SELECT * FROM ad_stat_of_day_dwd WHERE stat_day >= \"" + DateUtil.formatLocalDate(beginDay) + "\" AND stat_day <= \"" + endDay + "\" AND ad_id = " + adId + ";");
-                            instance.waitForSuccess();
-                            List<Record> records = SQLTask.getResult(instance);
-                            List<AdStatOfDayDWD> temp = records.stream().map(AdStatOfDayDWD::byMaxCompute).sorted((val1, val2) -> {
-                                if (val1.getStatDay().equals(val2.getStatDay())) {
-                                    return new Long(val1.getCreateTime().getTime() - val2.getCreateTime().getTime()).intValue();
-                                }
-                                return DateUtil.parseLocalDate(val1.getStatDay()).compareTo(DateUtil.parseLocalDate(val2.getStatDay()));
-                            }).collect(Collectors.toList());
-                            historyDayData.put(adId, temp.isEmpty() ? null : temp.get(temp.size() - 1));
-                            lastAdDayQueryDay.put(adId, statDay);
-                        }
-                        AdStatOfDayDWD beforeYesterdayDayDWD = historyDayData.get(adId);
-                        Map<String, AdStatOfMinuteDWD> lastReduceMapping = lastReduceData.computeIfAbsent(adId, key -> new HashMap<>());
-                        // 聚合当天的全部数据
-                        AdStatOfMinuteDWD nowAdStat = AdStatOfMinuteDWD.reduce(beforeYesterdayDayDWD, yesterdayMinuteDWD, adDataOfMinuteODSList, statODS, lastReduceMapping.get(DateUtil.formatOfMinuteReduce(beginTime.minusMinutes(5L))), now);
-                        collector.collect(nowAdStat);
-                        lastReduceMapping.put(reduceTimeKey, nowAdStat);
-                        historyMinuteMapping.put(statDay, nowAdStat);
+                    @Override
+                    public void clear(TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
+                        triggerContext.deleteEventTimeTimer(timeWindow.maxTimestamp());
                     }
-                }).addSink(new TunnelBatchSink<>(AdStatOfMinuteDWD.class, 30000L, 365L, 6));
+                })
+                .process(new AdMinuteDWDProcess());
+        // .addSink(new TunnelBatchSink<>(AdStatOfMinuteDWD.class, 30000L, 365L, 6));
+        new KeyedBatchStream<>("adMinuteDWDStream", adMinuteDWDStream.keyBy(AdStatOfMinuteDWD::getStatDay), 4000L, 60 * 1000L)
+                .toBatch()
+                .addSink(new TunnelBatchStreamSink<>(AdStatOfMinuteDWD.class));
 
         // 小时流(直接写到小时报表的 ods)
         DataStream<AdDataOfHourODS> adHourODSStream = adODSStream.getSideOutput(adHourStreamTag);
         // 写入原始表
-        adHourODSStream.addSink(new TunnelBatchSink<>(AdDataOfHourODS.class, 36000L, 64000L, 10));
-        adHourODSStream.keyBy(AdDataOfHourODS::getAdId)
+        // adHourODSStream.addSink(new TunnelBatchSink<>(AdDataOfHourODS.class, 36000L, 64000L, 10));
+        new KeyedBatchStream<>("adHourODSStream", adHourODSStream.keyBy(AdDataOfHourODS::getStatDay), 4000L, 5 * 60 * 1000L)
+                .toBatch()
+                .addSink(new TunnelBatchStreamSink<>(AdDataOfHourODS.class));
+
+        SingleOutputStreamOperator<AdStatOfHourDWD> adHourDWDStream = adHourODSStream.keyBy(AdDataOfHourODS::getAdId)
                 .countWindow(1).process(new ProcessWindowFunction<AdDataOfHourODS, AdStatOfHourDWD, Long, GlobalWindow>() {
                     private Odps odps;
                     private final Map<Long, Long> lastAdQueryTime = new ConcurrentHashMap<>();
@@ -324,48 +256,15 @@ public class AdStatJob {
                         hourDataMapping.put(newStatData.getHour(), newStatData);
                         collector.collect(newStatData);
                     }
-                }).addSink(new TunnelBatchSink<>(AdStatOfHourDWD.class, 30000L, 365L, 6));
-
-        // 获取指定广告的历史统计信息(用于统计总的消耗信息等)
-        /*SingleOutputStreamOperator<AdStatOfHourDWD> adHourDWDStream = adHourODSStream.map(AdStatOfHourDWD::byODS)
-                .keyBy((KeySelector<AdStatOfHourDWD, String>) adHourDWD -> adHourDWD.getStatDay() + "_" + adHourDWD.getAdId())
-                // 开一个按天滚动的窗口
-                .window(TumblingEventTimeWindows.of(Time.days(1L)))
-                // 数据聚合
-                .reduce(new RichReduceFunction<AdStatOfHourDWD>() {
-                    // 初始化 MaxCompute连接实例
-                    private Odps odps;
-
-                    @Override
-                    public void open(Configuration parameters) throws Exception {
-                        Map<String, String> params = getRuntimeContext()
-                                .getExecutionConfig()
-                                .getGlobalJobParameters()
-                                .toMap();
-                        Account account = new AliyunAccount(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_ID),
-                                params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_KEY));
-                        Odps odps = new Odps(account);
-                        odps.getRestClient().setRetryLogger(new MaxComputeLog());
-                        odps.setEndpoint(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_ENDPOINT));
-                        odps.setDefaultProject(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_PROJECT_NAME));
-                    }
-
-                    @Override
-                    public AdStatOfHourDWD reduce(AdStatOfHourDWD adStatOfHourDWD, AdStatOfHourDWD t1) {
-                        Long accountId = adStatOfHourDWD.getAccountId();
-                        Long campaignId = adStatOfHourDWD.getCampaignId();
-                        Long adId = adStatOfHourDWD.getAdId();
-                        String statDay = adStatOfHourDWD.getStatDay();
-                        // 1、从 MaxCompute取出小时数据往前的所有无重复按天统计的数据,并聚合用来算总的
-                        // 2、当前流的所有数据聚合在一起作为天数据
-                        // 3、写出到 MaxCompute
-                        return null;
-                    }
-                });*/
+                });
+        //.addSink(new TunnelBatchSink<>(AdStatOfHourDWD.class, 30000L, 365L, 6));
+        new KeyedBatchStream<>("adHourDWDStream", adHourDWDStream.keyBy(AdStatOfHourDWD::getStatDay), 4000L, 60 * 1000L)
+                .toBatch()
+                .addSink(new TunnelBatchStreamSink<>(AdStatOfHourDWD.class));
 
 
         // ------------------------------------------------------- 处理广告的天数据 -----------------------------------------
-        Properties adStreamOfDayProps = new Properties();
+        /*Properties adStreamOfDayProps = new Properties();
         adStreamOfDayProps.load(AdStatJob.class.getResourceAsStream("/ad_stream_of_day.properties"));
         KafkaSource<String> adStreamOfDaySource = buildKafkaSource(adStreamOfDayProps);
 
@@ -398,7 +297,10 @@ public class AdStatJob {
                             .adDataOfDayODS(adODS)
                             .build();
                 });
-        adDayODSStream.map(AdStatOfDayODSDTO::getAdDataOfDayODS).addSink(new TunnelBatchSink<>(AdDataOfDayODS.class, 36000L, 6000L, 10));
+        // adDayODSStream.map(AdStatOfDayODSDTO::getAdDataOfDayODS).addSink(new TunnelBatchSink<>(AdDataOfDayODS.class, 36000L, 6000L, 10));
+        new KeyedBatchStream<>("adDayODSStream", adDayODSStream.map(AdStatOfDayODSDTO::getAdDataOfDayODS).keyBy(AdDataOfDayODS::getStatDay), 4000L, 60 * 1000L)
+                .toBatch()
+                .addSink(new TunnelBatchStreamSink<>(AdDataOfDayODS.class));
 
         // 拆分流
         SingleOutputStreamOperator<AdStatOfDayODSDTO> adDayODSStreamSplit = adDayODSStream.process(new ProcessFunction<AdStatOfDayODSDTO, AdStatOfDayODSDTO>() {
@@ -417,7 +319,7 @@ public class AdStatJob {
             }
         });
 
-        adDayODSStreamSplit.getSideOutput(adDayStreamRollDayTag)
+        SingleOutputStreamOperator<AdStatOfDayDWD> adDayDWDStream = adDayODSStreamSplit.getSideOutput(adDayStreamRollDayTag)
                 // 打水印,允许延迟 10天(应为允许回滚 10天),同时指定事件时间
                 .assignTimestampsAndWatermarks(WatermarkStrategy.<AdDataOfDayODS>forBoundedOutOfOrderness(Duration.ofDays(10L))
                         .withTimestampAssigner((SerializableTimestampAssigner<AdDataOfDayODS>) (adDay, l) -> DateUtil.localDateToMilli(DateUtil.parseLocalDate(adDay.getStatDay())))
@@ -479,9 +381,13 @@ public class AdStatJob {
                         historyData.put(newStatData.getStatDay(), newStatData);
                         collector.collect(newStatData);
                     }
-                }).addSink(new TunnelBatchSink<>(AdStatOfDayDWD.class, 30000L, 365L, 6));
+                });
+        //.addSink(new TunnelBatchSink<>(AdStatOfDayDWD.class, 30000L, 365L, 6));
+        new KeyedBatchStream<>("adDayDWDStream", adDayDWDStream.keyBy(AdStatOfDayDWD::getStatDay), 4000L, 60 * 1000L)
+                .toBatch()
+                .addSink(new TunnelBatchStreamSink<>(AdStatOfDayDWD.class));
 
-        adDayODSStreamSplit.getSideOutput(adDayStreamRollYearTag)
+        SingleOutputStreamOperator<AdStatOfDayDWD> adDayDWDYearStream = adDayODSStreamSplit.getSideOutput(adDayStreamRollYearTag)
                 .keyBy((KeySelector<AdDataOfDayODS, Long>) AdDataOfDayODS::getAdId)
                 .countWindow(1).process(new ProcessWindowFunction<AdDataOfDayODS, AdStatOfDayDWD, Long, GlobalWindow>() {
                     // 上次聚合的结果
@@ -493,7 +399,11 @@ public class AdStatJob {
                         lastReduce = AdStatOfDayDWD.reduce(lastReduce, element, System.currentTimeMillis());
                         out.collect(lastReduce);
                     }
-                }).addSink(new TunnelBatchSink<>(AdStatOfDayDWD.class, 30000L, 365L, 6));
+                });
+        //.addSink(new TunnelBatchSink<>(AdStatOfDayDWD.class, 30000L, 365L, 6));
+        new KeyedBatchStream<>("adDayDWDYearStream", adDayDWDYearStream.keyBy(AdStatOfDayDWD::getStatDay), 4000L, 60 * 1000L)
+                .toBatch()
+                .addSink(new TunnelBatchStreamSink<>(AdStatOfDayDWD.class));*/
 
         env.execute();
     }

+ 15 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/Test.java

@@ -11,6 +11,9 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
@@ -21,6 +24,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+import org.apache.flink.table.planner.expressions.In;
 import org.apache.flink.util.Collector;
 
 import java.time.Duration;
@@ -77,18 +81,27 @@ public class Test {
                 .process(new ProcessWindowFunction<Pojo, String, Integer, GlobalWindow>() {
                     private final Map<Integer, Tuple3<LocalDateTime, LocalDateTime, List<Pojo>>> oldData = new ConcurrentHashMap<>();
 
+                    private MapState<Integer, Tuple3<LocalDateTime, LocalDateTime, List<Pojo>>> mapState;
+                    @Override
+                    public void open(Configuration conf) {
+                        mapState = getRuntimeContext().getMapState(new MapStateDescriptor<>("testState", Types.INT, Types.TUPLE(Types.LOCAL_DATE_TIME, Types.LOCAL_DATE_TIME, Types.LIST(Types.POJO(Pojo.class)))));
+                    }
+
                     @Override
                     public void process(Integer integer, ProcessWindowFunction<Pojo, String, Integer, GlobalWindow>.Context context,
                                         Iterable<Pojo> elements, Collector<String> out) throws Exception {
                         Pojo pojo = elements.iterator().next();
                         LocalDateTime createTime = DateUtil.milliToLocalDateTime(pojo.getCreateTime());
-                        Tuple3<LocalDateTime, LocalDateTime, List<Pojo>> temp = oldData.get(pojo.getUserId());
+                        // Tuple3<LocalDateTime, LocalDateTime, List<Pojo>> temp = oldData.get(pojo.getUserId());
+                        Tuple3<LocalDateTime, LocalDateTime, List<Pojo>> temp = mapState.get(pojo.getUserId());
+                        System.out.println(mapState.get(1));
+                        System.out.println(mapState.get(2));
                         if (temp == null || createTime.getDayOfYear() != temp.f1.getDayOfYear()) {
                             temp = new Tuple3<>(createTime, createTime, new ArrayList<>(200));
                         }
                         temp.f2.add(pojo);
                         temp.setField(createTime, 1);
-                        oldData.put(pojo.getUserId(), temp);
+                        mapState.put(pojo.getUserId(), temp);
                         out.collect(JsonUtil.toString(temp.f2.stream().map(Pojo::getIndex).collect(Collectors.toList())));
                     }
                 });

+ 150 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdMinuteDWDProcess.java

@@ -0,0 +1,150 @@
+package flink.zanxiangnet.ad.monitoring.process;
+
+import com.aliyun.odps.Instance;
+import com.aliyun.odps.Odps;
+import com.aliyun.odps.account.Account;
+import com.aliyun.odps.account.AliyunAccount;
+import com.aliyun.odps.data.Record;
+import com.aliyun.odps.task.SQLTask;
+import flink.zanxiangnet.ad.monitoring.maxcompute.MaxComputeLog;
+import flink.zanxiangnet.ad.monitoring.pojo.entity.AdDataOfMinuteODS;
+import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfDayDWD;
+import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfMinuteDWD;
+import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
+import flink.zanxiangnet.ad.monitoring.util.DateUtil;
+import flink.zanxiangnet.ad.monitoring.util.JsonUtil;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+public class AdMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteODS, AdStatOfMinuteDWD, Long, TimeWindow> {
+    // 对象锁,防止MaxCompute的 Tunnel对象多次初始化
+    private static final Object DUMMY_LOCK = new Object();
+    private Odps odps;
+    private long lastClearTime = System.currentTimeMillis();
+    private final Map<Long, String> lastAdDayQueryDay = new ConcurrentHashMap<>();
+    // 历史的天数据
+    private final Map<Long, AdStatOfDayDWD> historyDayData = new ConcurrentHashMap<>();
+    // 之前聚合的昨天的数据
+    private final Map<Long, Map<String, AdStatOfMinuteDWD>> historyMinuteData = new ConcurrentHashMap<>();
+    // 前 5分钟聚合的数据
+    private final Map<Long, Map<String, AdStatOfMinuteDWD>> lastReduceData = new ConcurrentHashMap<>();
+
+    @Override
+    public void open(Configuration conf) {
+        Map<String, String> params = getRuntimeContext()
+                .getExecutionConfig()
+                .getGlobalJobParameters()
+                .toMap();
+        Account account = new AliyunAccount(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_ID),
+                params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_KEY));
+        odps = new Odps(account);
+        odps.getRestClient().setRetryLogger(new MaxComputeLog());
+        odps.setEndpoint(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_ENDPOINT));
+        odps.setDefaultProject(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_PROJECT_NAME));
+    }
+
+    @Override
+    public void process(Long elementCount, ProcessWindowFunction<AdDataOfMinuteODS, AdStatOfMinuteDWD, Long, TimeWindow>.Context context,
+                        Iterable<AdDataOfMinuteODS> iterable, Collector<AdStatOfMinuteDWD> collector) throws Exception {
+        MapState<String, AdStatOfMinuteDWD> historyMinuteState = context.windowState().getMapState(new MapStateDescriptor<>("historyMinuteData", String.class, AdStatOfMinuteDWD.class));
+        LocalDateTime beginTime = DateUtil.milliToLocalDateTime(context.window().getStart());
+        String statDay = DateUtil.formatLocalDate(beginTime.toLocalDate());
+        String reduceTimeKey = DateUtil.formatOfMinuteReduce(beginTime);
+        int hour = beginTime.getHour();
+        long now = System.currentTimeMillis();
+        AdDataOfMinuteODS statODS = null;
+        List<AdDataOfMinuteODS> adDataOfMinuteODSList = new ArrayList<>(24);
+        System.out.println("windowCount:" + adDataOfMinuteODSList);
+        for (AdDataOfMinuteODS adDataOfMinuteODS : iterable) {
+            adDataOfMinuteODSList.add(adDataOfMinuteODS);
+            if (adDataOfMinuteODS.getHour() != hour) {
+                continue;
+            }
+            statODS = adDataOfMinuteODS;
+            break;
+        }
+        if (statODS == null) {
+            return;
+        }
+
+        // 每 12小时清理一次数据
+        if (now - lastClearTime > 12 * 60 * 60 * 1000) {
+            synchronized (DUMMY_LOCK) {
+                if (now - lastClearTime > 12 * 60 * 60 * 1000) {
+                    clearData();
+                    lastClearTime = now;
+                }
+            }
+        }
+        Long adId = statODS.getAdId();
+
+        // 昨天聚合的数据
+        Map<String, AdStatOfMinuteDWD> historyMinuteMapping = historyMinuteData.computeIfAbsent(adId, key -> new HashMap<>());
+        AdStatOfMinuteDWD yesterdayMinuteDWD = historyMinuteMapping.get(DateUtil.formatLocalDate(beginTime.minusDays(1L).toLocalDate()));
+        // 之前的数据
+        String lastQueryDay = lastAdDayQueryDay.get(adId);
+        if (lastQueryDay == null || !lastQueryDay.equals(statDay)) {
+            // 往前找 30天
+            LocalDate endDay = beginTime.minusDays(2L).toLocalDate(), beginDay = endDay.minusDays(30L);
+            Instance instance = SQLTask.run(odps, "SELECT * FROM ad_stat_of_day_dwd WHERE stat_day >= \"" + DateUtil.formatLocalDate(beginDay) + "\" AND stat_day <= \"" + endDay + "\" AND ad_id = " + adId + ";");
+            instance.waitForSuccess();
+            List<Record> records = SQLTask.getResult(instance);
+            List<AdStatOfDayDWD> temp = records.stream().map(AdStatOfDayDWD::byMaxCompute).sorted((val1, val2) -> {
+                if (val1.getStatDay().equals(val2.getStatDay())) {
+                    return new Long(val1.getCreateTime().getTime() - val2.getCreateTime().getTime()).intValue();
+                }
+                return DateUtil.parseLocalDate(val1.getStatDay()).compareTo(DateUtil.parseLocalDate(val2.getStatDay()));
+            }).collect(Collectors.toList());
+            if(!temp.isEmpty()) {
+                historyDayData.put(adId, temp.get(temp.size() - 1));
+            }
+            System.out.println(JsonUtil.toString("历史天数据:" + historyDayData));
+            lastAdDayQueryDay.put(adId, statDay);
+        }
+        AdStatOfDayDWD beforeYesterdayDayDWD = historyDayData.get(adId);
+        Map<String, AdStatOfMinuteDWD> lastReduceMapping = lastReduceData.computeIfAbsent(adId, key -> new HashMap<>());
+        // 聚合当天的全部数据
+        AdStatOfMinuteDWD newAdStat = AdStatOfMinuteDWD.reduce(beforeYesterdayDayDWD, yesterdayMinuteDWD, adDataOfMinuteODSList, statODS, lastReduceMapping.get(DateUtil.formatOfMinuteReduce(beginTime.minusMinutes(5L))), now);
+        collector.collect(newAdStat);
+        System.out.println("输出:" + JsonUtil.toString(newAdStat));
+        lastReduceMapping.put(reduceTimeKey, newAdStat);
+        historyMinuteMapping.put(statDay, newAdStat);
+    }
+
+    @Override
+    public void clear(ProcessWindowFunction<AdDataOfMinuteODS, AdStatOfMinuteDWD, Long, TimeWindow>.Context context) throws Exception {
+        System.out.println("窗口关闭");
+    }
+
+    private void clearData() {
+        lastAdDayQueryDay.entrySet().removeIf(entry -> DateUtil.parseLocalDate(entry.getValue()).compareTo(LocalDate.now().minusDays(3L)) <= 0);
+        historyDayData.entrySet().removeIf(entry -> DateUtil.parseLocalDate(entry.getValue().getStatDay()).compareTo(LocalDate.now().minusDays(50L)) <= 0);
+        historyMinuteData.forEach((key, value) -> {
+            if (value != null) {
+                value.entrySet().removeIf(entry2 -> DateUtil.parseLocalDate(entry2.getKey()).compareTo(LocalDate.now().minusDays(3L)) <= 0);
+            }
+        });
+        historyMinuteData.entrySet().removeIf(entry -> entry.getValue() == null || entry.getValue().isEmpty());
+
+        lastReduceData.forEach((key, value) -> {
+            if (value != null) {
+                value.entrySet().removeIf(entry -> DateUtil.parseOfMinuteReduce(entry.getKey()).compareTo(LocalDateTime.now().minusHours(1)) <= 0);
+            }
+        });
+        lastReduceData.entrySet().removeIf(entry -> entry.getValue() == null || entry.getValue().isEmpty());
+    }
+}

+ 1 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/sink/TunnelBatchStreamSink.java

@@ -24,7 +24,7 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 /**
- * 该类有严重 bug,请勿使用
+ * 批量数据写出
  *
  * @param <IN>
  */
@@ -99,7 +99,6 @@ public class TunnelBatchStreamSink<T, IN extends List<T>> extends RichSinkFuncti
             // append只是写入内存
             pack.append(record);
         }
-        System.out.println("写入数据==》" + value.size());
         int retry = 0;
         do {
             try {

+ 1 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/stream/BatchStream.java

@@ -30,6 +30,7 @@ public class BatchStream<T> {
     public SingleOutputStreamOperator<List<T>> toBatch() {
         return stream.windowAll(GlobalWindows.create())
                 .trigger(new TimerCountTrigger<>(streamName + "_trigger", maxBufferCount, bufferRefreshTime))
+                // 这里不能使用 lambda表达式,flink无法推测类型
                 .apply(new AllWindowFunction<T, List<T>, GlobalWindow>() {
                     @Override
                     public void apply(GlobalWindow globalWindow, Iterable<T> iterable, Collector<List<T>> collector) throws Exception {

+ 1 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/stream/KeyedBatchStream.java

@@ -2,8 +2,6 @@ package flink.zanxiangnet.ad.monitoring.stream;
 
 import com.google.common.collect.Lists;
 import flink.zanxiangnet.ad.monitoring.trigger.TimerCountTrigger;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
@@ -32,6 +30,7 @@ public class KeyedBatchStream<T, KEY> {
     public SingleOutputStreamOperator<List<T>> toBatch() {
         return stream.window(GlobalWindows.create())
                 .trigger(new TimerCountTrigger<>(streamName + "_trigger", maxBufferCount, bufferRefreshTime))
+                // 这里不能使用 lambda表达式,flink无法推测类型
                 .apply(new WindowFunction<T, List<T>, KEY, GlobalWindow>() {
                     @Override
                     public void apply(KEY key, GlobalWindow globalWindow, Iterable<T> iterable, Collector<List<T>> collector) throws Exception {

+ 5 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/trigger/TimerCountTrigger.java

@@ -46,6 +46,10 @@ public class TimerCountTrigger<T, W extends Window> extends Trigger<T, W> {
         long now = System.currentTimeMillis();
         Long windowCount = windowCountState.value() == null ? 1L : windowCountState.value() + 1;
         Long windowTime = windowTimeState.value() == null ? now : windowTimeState.value();
+        if (windowCount == 1 || windowTime == now) {
+            // 注册一个定时器,到时间了去触发 ProcessTime
+            triggerContext.registerProcessingTimeTimer(triggerContext.getCurrentProcessingTime() + bufferRefreshTime);
+        }
 
         if (windowCount >= maxBufferCount || (now - windowTime) >= bufferRefreshTime) {
             windowCountState.update(0L);
@@ -57,8 +61,7 @@ public class TimerCountTrigger<T, W extends Window> extends Trigger<T, W> {
         if (windowTimeState.value() == null) {
             windowTimeState.update(now);
         }
-        // 注册一个定时器,到时间了去触发 ProcessTime
-        triggerContext.registerProcessingTimeTimer(triggerContext.getCurrentProcessingTime() + bufferRefreshTime);
+
         return TriggerResult.CONTINUE;
     }