소스 검색

version 0.7

wcc 3 년 전
부모
커밋
8393f4aae5

+ 60 - 69
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdStatJob.java

@@ -6,8 +6,11 @@ import com.aliyun.odps.account.Account;
 import com.aliyun.odps.account.AliyunAccount;
 import com.aliyun.odps.data.Record;
 import com.aliyun.odps.task.SQLTask;
+import com.tencent.ads.model.DailyReportsGetListStruct;
 import com.tencent.ads.model.HourlyReportsGetListStruct;
 import flink.zanxiangnet.ad.monitoring.maxcompute.MaxComputeLog;
+import flink.zanxiangnet.ad.monitoring.pojo.dto.AdDataOfDayDTO;
+import flink.zanxiangnet.ad.monitoring.pojo.dto.AdStatOfDayODSDTO;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.*;
 import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
 import flink.zanxiangnet.ad.monitoring.pojo.properties.KafkaProperties;
@@ -20,6 +23,11 @@ import flink.zanxiangnet.ad.monitoring.util.JsonUtil;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.eventtime.*;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.kafka.source.KafkaSource;
 import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
@@ -48,8 +56,8 @@ import org.springframework.beans.BeanUtils;
 import java.time.Duration;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
+import java.time.LocalTime;
 import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 public class AdStatJob {
@@ -142,8 +150,7 @@ public class AdStatJob {
                 .trigger(new Trigger<AdDataOfMinuteODS, TimeWindow>() {
                     @Override
                     public TriggerResult onElement(AdDataOfMinuteODS adDataOfMinuteODS, long time, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
-                        System.out.println("eventTime:" + DateUtil.formatLocalDateTime(DateUtil.milliToLocalDateTime(time)) +
-                                " | 水印时间:" + DateUtil.formatLocalDateTime(DateUtil.milliToLocalDateTime(triggerContext.getCurrentWatermark())));
+                        // FIXME 水印时间没有意义!拿到数据是 Long.MAX
                         if (timeWindow.maxTimestamp() <= triggerContext.getCurrentWatermark()) {
                             // 到了窗口的最大生命周期
                             return TriggerResult.FIRE_AND_PURGE;
@@ -187,8 +194,10 @@ public class AdStatJob {
         SingleOutputStreamOperator<AdStatOfHourDWD> adHourDWDStream = adHourODSStream.keyBy(AdDataOfHourODS::getAdId)
                 .countWindow(1).process(new ProcessWindowFunction<AdDataOfHourODS, AdStatOfHourDWD, Long, GlobalWindow>() {
                     private Odps odps;
-                    private final Map<Long, Long> lastAdQueryTime = new ConcurrentHashMap<>();
-                    private final Map<Long, Map<String, Map<Integer, AdStatOfHourDWD>>> historyAdData = new ConcurrentHashMap<>();
+                    // 上次查询的天数据
+                    private ValueState<String> lastQueryDayState;
+                    // 聚合的天的数据
+                    private MapState<String, AdStatOfHourDWD> historyReduceState;
 
                     @Override
                     public void open(Configuration conf) {
@@ -202,58 +211,44 @@ public class AdStatJob {
                         odps.getRestClient().setRetryLogger(new MaxComputeLog());
                         odps.setEndpoint(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_ENDPOINT));
                         odps.setDefaultProject(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_PROJECT_NAME));
+
+                        lastQueryDayState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastQueryDayState", String.class));
+                        historyReduceState = getRuntimeContext().getMapState(new MapStateDescriptor<>("historyReduceState", Types.STRING, Types.POJO(AdStatOfHourDWD.class)));
                     }
 
                     @Override
                     public void process(Long elementsCount, ProcessWindowFunction<AdDataOfHourODS, AdStatOfHourDWD, Long, GlobalWindow>.Context context, Iterable<AdDataOfHourODS> iterable, Collector<AdStatOfHourDWD> collector) throws Exception {
                         AdDataOfHourODS element = iterable.iterator().next();
                         LocalDate statDay = DateUtil.parseLocalDate(element.getStatDay());
-                        Integer hour = element.getHour();
+                        LocalDateTime statTime = LocalDateTime.of(statDay, LocalTime.of(element.getHour(), 0, 0));
                         long now = System.currentTimeMillis();
-                        Long adId = element.getAdId();
+                        LocalDate today = LocalDate.now();
 
-                        Map<String, Map<Integer, AdStatOfHourDWD>> historyData = historyAdData.computeIfAbsent(adId, key -> new HashMap<>());
-                        Long lastQueryTime = lastAdQueryTime.get(adId);
+                        String lastQueryDay = lastQueryDayState.value();
                         // 从 maxCompute拉取指定 广告的历史数据
-                        if (lastQueryTime == null || (now - lastQueryTime) > 3 * 60 * 60 * 1000) {
-                            LocalDate endTime = LocalDate.now(), beginTime = statDay.minusDays(40);
-                            Instance instance = SQLTask.run(odps, "SELECT * FROM ad_stat_of_hour_dwd WHERE stat_day >= \"" + DateUtil.formatLocalDate(beginTime) + "\" AND stat_day <= \"" + DateUtil.formatLocalDate(endTime) + "\" AND ad_id = " + element.getAdId() + ";");
+                        if (lastQueryDay == null || !lastQueryDay.equals(DateUtil.formatLocalDate(today))) {
+                            LocalDate endDay = today, beginDay = statDay.minusDays(60);
+                            Instance instance = SQLTask.run(odps, "SELECT * FROM ad_stat_of_hour_dwd WHERE stat_day >= \"" + DateUtil.formatLocalDate(beginDay) + "\" AND stat_day <= \"" + DateUtil.formatLocalDate(endDay) + "\" AND ad_id = " + element.getAdId() + ";");
                             instance.waitForSuccess();
                             List<Record> records = SQLTask.getResult(instance);
-                            List<AdStatOfHourDWD> historyList = records.stream()
+                            Map<String, AdStatOfHourDWD> historyHourMap = records.stream()
                                     .map(AdStatOfHourDWD::byMaxCompute)
                                     .sorted((o1, o2) -> new Long(o1.getCreateTime().getTime() - o2.getCreateTime().getTime()).intValue())
-                                    .collect(Collectors.toList());
-                            for (AdStatOfHourDWD adStatOfHourDWD : historyList) {
-                                Map<Integer, AdStatOfHourDWD> hourMapping = historyData.computeIfAbsent(adStatOfHourDWD.getStatDay(), key -> new HashMap<>());
-                                hourMapping.put(adStatOfHourDWD.getHour(), adStatOfHourDWD);
-                            }
-                            lastAdQueryTime.put(adId, now);
+                                    .collect(Collectors.toMap(data -> data.getStatDay() + data.getHour(), data -> data, (val1, val2) -> val2));
+                            historyReduceState.clear();
+                            historyReduceState.putAll(historyHourMap);
+                            lastQueryDayState.update(DateUtil.formatLocalDate(today));
                         }
-                        // 找到这个号当天内上个小时统计的数据
-                        AdStatOfHourDWD lastHourData = null;
-                        // 往前查 40天,找到广告上一次聚合的数据
-                        for (int i = 0; i < 40; i++) {
-                            Map<Integer, AdStatOfHourDWD> mapData = historyData.get(DateUtil.formatLocalDate(statDay.minusDays(i)));
-                            if (mapData == null || mapData.isEmpty()) {
-                                continue;
-                            }
-                            for (Map.Entry<Integer, AdStatOfHourDWD> temp : mapData.entrySet()) {
-                                if (temp.getKey() >= hour) {
-                                    continue;
-                                }
-                                if (lastHourData != null && lastHourData.getHour() >= temp.getKey()) {
-                                    continue;
-                                }
-                                lastHourData = temp.getValue();
-                            }
-                            if (lastHourData != null) {
+                        AdStatOfHourDWD lastReduceData = null;
+                        for (int i = 1; i < 60 * 24; i++) {
+                            LocalDateTime time = statTime.minusHours(i);
+                            lastReduceData = historyReduceState.get(DateUtil.formatLocalDate(time.toLocalDate()) + time.getHour());
+                            if (lastReduceData != null) {
                                 break;
                             }
                         }
-                        AdStatOfHourDWD newStatData = AdStatOfHourDWD.reduce(lastHourData, element, now);
-                        Map<Integer, AdStatOfHourDWD> hourDataMapping = historyData.computeIfAbsent(newStatData.getStatDay(), key -> new HashMap<>());
-                        hourDataMapping.put(newStatData.getHour(), newStatData);
+
+                        AdStatOfHourDWD newStatData = AdStatOfHourDWD.reduce(lastReduceData, element, now);
                         collector.collect(newStatData);
                     }
                 });
@@ -264,7 +259,7 @@ public class AdStatJob {
 
 
         // ------------------------------------------------------- 处理广告的天数据 -----------------------------------------
-        /*Properties adStreamOfDayProps = new Properties();
+        Properties adStreamOfDayProps = new Properties();
         adStreamOfDayProps.load(AdStatJob.class.getResourceAsStream("/ad_stream_of_day.properties"));
         KafkaSource<String> adStreamOfDaySource = buildKafkaSource(adStreamOfDayProps);
 
@@ -320,14 +315,13 @@ public class AdStatJob {
         });
 
         SingleOutputStreamOperator<AdStatOfDayDWD> adDayDWDStream = adDayODSStreamSplit.getSideOutput(adDayStreamRollDayTag)
-                // 打水印,允许延迟 10天(应为允许回滚 10天),同时指定事件时间
-                .assignTimestampsAndWatermarks(WatermarkStrategy.<AdDataOfDayODS>forBoundedOutOfOrderness(Duration.ofDays(10L))
-                        .withTimestampAssigner((SerializableTimestampAssigner<AdDataOfDayODS>) (adDay, l) -> DateUtil.localDateToMilli(DateUtil.parseLocalDate(adDay.getStatDay())))
-                ).keyBy(AdDataOfDayODS::getAdId)
+                .keyBy(AdDataOfDayODS::getAdId)
                 .countWindow(1).process(new ProcessWindowFunction<AdDataOfDayODS, AdStatOfDayDWD, Long, GlobalWindow>() {
                     private Odps odps;
-                    private final Map<Long, Long> lastAdQueryTime = new ConcurrentHashMap<>();
-                    private final Map<Long, Map<String, AdStatOfDayDWD>> historyAdData = new ConcurrentHashMap<>();
+                    // 上次查询的天数据
+                    private ValueState<String> lastQueryDayState;
+                    // 之前聚合的昨天的数据
+                    private MapState<String, AdStatOfDayDWD> historyReduceState;
 
                     @Override
                     public void open(Configuration conf) {
@@ -341,44 +335,41 @@ public class AdStatJob {
                         odps.getRestClient().setRetryLogger(new MaxComputeLog());
                         odps.setEndpoint(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_ENDPOINT));
                         odps.setDefaultProject(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_PROJECT_NAME));
+
+                        lastQueryDayState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastQueryDayState", String.class));
+                        historyReduceState = getRuntimeContext().getMapState(new MapStateDescriptor<>("historyReduceState", String.class, AdStatOfDayDWD.class));
                     }
 
                     @Override
                     public void process(Long elementsCount, ProcessWindowFunction<AdDataOfDayODS, AdStatOfDayDWD, Long, GlobalWindow>.Context context, Iterable<AdDataOfDayODS> iterable, Collector<AdStatOfDayDWD> collector) throws Exception {
                         AdDataOfDayODS element = iterable.iterator().next();
-                        Long adId = element.getAdId();
                         LocalDate statDay = DateUtil.parseLocalDate(element.getStatDay());
                         long now = System.currentTimeMillis();
 
-                        Map<String, AdStatOfDayDWD> historyData = historyAdData.computeIfAbsent(adId, key -> new HashMap<>());
-                        Long lastQueryTime = lastAdQueryTime.get(adId);
+                        String lastQueryDay = lastQueryDayState.value();
                         // 从 maxCompute查找广告的历史数据
-                        if (lastQueryTime == null || (now - lastQueryTime) > 60 * 60 * 1000) {
+                        if (lastQueryDay == null || !lastQueryDay.equals(DateUtil.formatLocalDate(LocalDate.now()))) {
                             LocalDate endTime = LocalDate.now(), beginTime = statDay.minusDays(60);
                             Instance instance = SQLTask.run(odps, "SELECT * FROM ad_stat_of_day_dwd WHERE stat_day >= \"" + DateUtil.formatLocalDate(beginTime) + "\" AND stat_day <= \"" + DateUtil.formatLocalDate(endTime) + "\" AND ad_id = " + element.getAdId() + ";");
                             instance.waitForSuccess();
                             List<Record> records = SQLTask.getResult(instance);
-                            List<AdStatOfDayDWD> list = records.stream()
+                            Map<String, AdStatOfDayDWD> historyData = records.stream()
                                     .map(AdStatOfDayDWD::byMaxCompute)
-                                    .sorted((o1, o2) -> new Long(o1.getCreateTime().getTime() - o2.getCreateTime().getTime()).intValue()).collect(Collectors.toList());
-                            for (AdStatOfDayDWD adStatOfDayDWD : list) {
-                                historyData.put(adStatOfDayDWD.getStatDay(), adStatOfDayDWD);
-                            }
-                            lastAdQueryTime.put(adId, now);
+                                    .sorted((o1, o2) -> new Long(o1.getCreateTime().getTime() - o2.getCreateTime().getTime()).intValue())
+                                    .collect(Collectors.toMap(AdStatOfDayDWD::getStatDay, data -> data, (val1, val2) -> val2));
+                            historyReduceState.clear();
+                            historyReduceState.putAll(historyData);
+                            lastQueryDayState.update(DateUtil.formatLocalDate(LocalDate.now()));
                         }
-                        AdStatOfDayDWD newStatData = null;
+                        AdStatOfDayDWD lastReduceData = null;
                         for (int i = 1; i <= 60; i++) {
-                            AdStatOfDayDWD oldStatData = historyData.get(DateUtil.formatLocalDate(statDay.minusDays(i)));
-                            if (oldStatData == null) {
-                                continue;
+                            lastReduceData = historyReduceState.get(DateUtil.formatLocalDate(statDay.minusDays(i)));
+                            if (lastReduceData != null) {
+                                break;
                             }
-                            newStatData = AdStatOfDayDWD.reduce(oldStatData, element, now);
-                            break;
                         }
-                        if (newStatData == null) {
-                            newStatData = AdStatOfDayDWD.reduce(null, element, now);
-                        }
-                        historyData.put(newStatData.getStatDay(), newStatData);
+                        AdStatOfDayDWD newStatData = AdStatOfDayDWD.reduce(lastReduceData, element, now);
+                        historyReduceState.put(DateUtil.formatLocalDate(statDay), newStatData);
                         collector.collect(newStatData);
                     }
                 });
@@ -388,7 +379,7 @@ public class AdStatJob {
                 .addSink(new TunnelBatchStreamSink<>(AdStatOfDayDWD.class));
 
         SingleOutputStreamOperator<AdStatOfDayDWD> adDayDWDYearStream = adDayODSStreamSplit.getSideOutput(adDayStreamRollYearTag)
-                .keyBy((KeySelector<AdDataOfDayODS, Long>) AdDataOfDayODS::getAdId)
+                .keyBy(AdDataOfDayODS::getAdId)
                 .countWindow(1).process(new ProcessWindowFunction<AdDataOfDayODS, AdStatOfDayDWD, Long, GlobalWindow>() {
                     // 上次聚合的结果
                     private AdStatOfDayDWD lastReduce;
@@ -403,7 +394,7 @@ public class AdStatJob {
         //.addSink(new TunnelBatchSink<>(AdStatOfDayDWD.class, 30000L, 365L, 6));
         new KeyedBatchStream<>("adDayDWDYearStream", adDayDWDYearStream.keyBy(AdStatOfDayDWD::getStatDay), 4000L, 60 * 1000L)
                 .toBatch()
-                .addSink(new TunnelBatchStreamSink<>(AdStatOfDayDWD.class));*/
+                .addSink(new TunnelBatchStreamSink<>(AdStatOfDayDWD.class));
 
         env.execute();
     }

+ 2 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/PlanStatJob.java

@@ -186,7 +186,7 @@ public class PlanStatJob {
                                         Iterable<AdDataOfMinuteODS> iterable, Collector<AdStatOfMinuteDWD> collector) throws Exception {
                         LocalDateTime beginTime = DateUtil.milliToLocalDateTime(context.window().getStart());
                         String statDay = DateUtil.formatLocalDate(beginTime.toLocalDate());
-                        String reduceTimeKey = DateUtil.formatOfMinuteReduce(beginTime);
+                        String reduceTimeKey = "";
                         int hour = beginTime.getHour();
                         long now = System.currentTimeMillis();
                         AdDataOfMinuteODS statODS = null;
@@ -237,7 +237,7 @@ public class PlanStatJob {
                         AdStatOfDayDWD beforeYesterdayDayDWD = historyDayData.get(adId);
                         Map<String, AdStatOfMinuteDWD> lastReduceMapping = lastReduceData.computeIfAbsent(adId, key -> new HashMap<>());
                         // 聚合当天的全部数据
-                        AdStatOfMinuteDWD nowAdStat = AdStatOfMinuteDWD.reduce(beforeYesterdayDayDWD, yesterdayMinuteDWD, adDataOfMinuteODSList, statODS, lastReduceMapping.get(DateUtil.formatOfMinuteReduce(beginTime.minusMinutes(5L))), now);
+                        AdStatOfMinuteDWD nowAdStat = AdStatOfMinuteDWD.reduce(beforeYesterdayDayDWD, yesterdayMinuteDWD, adDataOfMinuteODSList, statODS, null, now);
                         collector.collect(nowAdStat);
                         lastReduceMapping.put(reduceTimeKey, nowAdStat);
                         historyMinuteMapping.put(statDay, nowAdStat);

+ 39 - 56
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdMinuteDWDProcess.java

@@ -15,6 +15,7 @@ import flink.zanxiangnet.ad.monitoring.util.DateUtil;
 import flink.zanxiangnet.ad.monitoring.util.JsonUtil;
 import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
@@ -24,24 +25,21 @@ import org.apache.flink.util.Collector;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 public class AdMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteODS, AdStatOfMinuteDWD, Long, TimeWindow> {
-    // 对象锁,防止MaxCompute的 Tunnel对象多次初始化
-    private static final Object DUMMY_LOCK = new Object();
     private Odps odps;
-    private long lastClearTime = System.currentTimeMillis();
-    private final Map<Long, String> lastAdDayQueryDay = new ConcurrentHashMap<>();
+
     // 历史的天数据
-    private final Map<Long, AdStatOfDayDWD> historyDayData = new ConcurrentHashMap<>();
+    private ValueState<AdStatOfDayDWD> historyDayState;
+    // 上次查询的天数据
+    private ValueState<String> lastQueryDayState;
     // 之前聚合的昨天的数据
-    private final Map<Long, Map<String, AdStatOfMinuteDWD>> historyMinuteData = new ConcurrentHashMap<>();
+    private MapState<String, AdStatOfMinuteDWD> yesterdayMinuteState;
     // 前 5分钟聚合的数据
-    private final Map<Long, Map<String, AdStatOfMinuteDWD>> lastReduceData = new ConcurrentHashMap<>();
+    private ValueState<AdStatOfMinuteDWD> lastReduceState;
 
     @Override
     public void open(Configuration conf) {
@@ -55,20 +53,24 @@ public class AdMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteODS,
         odps.getRestClient().setRetryLogger(new MaxComputeLog());
         odps.setEndpoint(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_ENDPOINT));
         odps.setDefaultProject(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_PROJECT_NAME));
+
+        historyDayState = getRuntimeContext().getState(new ValueStateDescriptor<>("historyDayState", AdStatOfDayDWD.class));
+        lastQueryDayState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastQueryDayState", String.class));
+        yesterdayMinuteState = getRuntimeContext().getMapState(new MapStateDescriptor<>("yesterdayMinuteState", String.class, AdStatOfMinuteDWD.class));
+        lastReduceState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastReduceState", AdStatOfMinuteDWD.class));
     }
 
     @Override
     public void process(Long elementCount, ProcessWindowFunction<AdDataOfMinuteODS, AdStatOfMinuteDWD, Long, TimeWindow>.Context context,
                         Iterable<AdDataOfMinuteODS> iterable, Collector<AdStatOfMinuteDWD> collector) throws Exception {
-        MapState<String, AdStatOfMinuteDWD> historyMinuteState = context.windowState().getMapState(new MapStateDescriptor<>("historyMinuteData", String.class, AdStatOfMinuteDWD.class));
-        LocalDateTime beginTime = DateUtil.milliToLocalDateTime(context.window().getStart());
-        String statDay = DateUtil.formatLocalDate(beginTime.toLocalDate());
-        String reduceTimeKey = DateUtil.formatOfMinuteReduce(beginTime);
-        int hour = beginTime.getHour();
+        long beginTime = context.window().getStart();
+        LocalDateTime beginDateTime = DateUtil.milliToLocalDateTime(beginTime);
+        LocalDate beginDate = beginDateTime.toLocalDate();
+        String statDay = DateUtil.formatLocalDate(beginDate);
+        int hour = beginDateTime.getHour();
         long now = System.currentTimeMillis();
         AdDataOfMinuteODS statODS = null;
         List<AdDataOfMinuteODS> adDataOfMinuteODSList = new ArrayList<>(24);
-        System.out.println("windowCount:" + adDataOfMinuteODSList);
         for (AdDataOfMinuteODS adDataOfMinuteODS : iterable) {
             adDataOfMinuteODSList.add(adDataOfMinuteODS);
             if (adDataOfMinuteODS.getHour() != hour) {
@@ -77,74 +79,55 @@ public class AdMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteODS,
             statODS = adDataOfMinuteODS;
             break;
         }
+        System.out.println("windowCount:" + adDataOfMinuteODSList.size());
         if (statODS == null) {
             return;
         }
 
-        // 每 12小时清理一次数据
-        if (now - lastClearTime > 12 * 60 * 60 * 1000) {
-            synchronized (DUMMY_LOCK) {
-                if (now - lastClearTime > 12 * 60 * 60 * 1000) {
-                    clearData();
-                    lastClearTime = now;
-                }
-            }
-        }
         Long adId = statODS.getAdId();
 
         // 昨天聚合的数据
-        Map<String, AdStatOfMinuteDWD> historyMinuteMapping = historyMinuteData.computeIfAbsent(adId, key -> new HashMap<>());
-        AdStatOfMinuteDWD yesterdayMinuteDWD = historyMinuteMapping.get(DateUtil.formatLocalDate(beginTime.minusDays(1L).toLocalDate()));
+        AdStatOfMinuteDWD yesterdayMinuteDWD = yesterdayMinuteState.get(DateUtil.formatLocalDate(beginDate.minusDays(1L)));
         // 之前的数据
-        String lastQueryDay = lastAdDayQueryDay.get(adId);
+        String lastQueryDay = lastQueryDayState.value();
         if (lastQueryDay == null || !lastQueryDay.equals(statDay)) {
             // 往前找 30天
-            LocalDate endDay = beginTime.minusDays(2L).toLocalDate(), beginDay = endDay.minusDays(30L);
+            LocalDate endDay = beginDate.minusDays(2L), beginDay = endDay.minusDays(30L);
             Instance instance = SQLTask.run(odps, "SELECT * FROM ad_stat_of_day_dwd WHERE stat_day >= \"" + DateUtil.formatLocalDate(beginDay) + "\" AND stat_day <= \"" + endDay + "\" AND ad_id = " + adId + ";");
             instance.waitForSuccess();
             List<Record> records = SQLTask.getResult(instance);
-            List<AdStatOfDayDWD> temp = records.stream().map(AdStatOfDayDWD::byMaxCompute).sorted((val1, val2) -> {
+            List<AdStatOfDayDWD> historyDayData = records.stream().map(AdStatOfDayDWD::byMaxCompute).sorted((val1, val2) -> {
                 if (val1.getStatDay().equals(val2.getStatDay())) {
                     return new Long(val1.getCreateTime().getTime() - val2.getCreateTime().getTime()).intValue();
                 }
                 return DateUtil.parseLocalDate(val1.getStatDay()).compareTo(DateUtil.parseLocalDate(val2.getStatDay()));
             }).collect(Collectors.toList());
-            if(!temp.isEmpty()) {
-                historyDayData.put(adId, temp.get(temp.size() - 1));
+            if (!historyDayData.isEmpty()) {
+                historyDayState.update(historyDayData.get(historyDayData.size() - 1));
+            }
+            lastQueryDayState.update(statDay);
+        }
+        AdStatOfDayDWD beforeYesterdayDayDWD = historyDayState.value();
+
+        AdStatOfMinuteDWD lastReduceData = lastReduceState.value();
+        if (lastReduceData != null) {
+            if (lastReduceData.getStatTime() != (beginTime - 5 * 60 * 1000L)) {
+                lastReduceData = null;
             }
-            System.out.println(JsonUtil.toString("历史天数据:" + historyDayData));
-            lastAdDayQueryDay.put(adId, statDay);
         }
-        AdStatOfDayDWD beforeYesterdayDayDWD = historyDayData.get(adId);
-        Map<String, AdStatOfMinuteDWD> lastReduceMapping = lastReduceData.computeIfAbsent(adId, key -> new HashMap<>());
         // 聚合当天的全部数据
-        AdStatOfMinuteDWD newAdStat = AdStatOfMinuteDWD.reduce(beforeYesterdayDayDWD, yesterdayMinuteDWD, adDataOfMinuteODSList, statODS, lastReduceMapping.get(DateUtil.formatOfMinuteReduce(beginTime.minusMinutes(5L))), now);
+        AdStatOfMinuteDWD newAdStat = AdStatOfMinuteDWD.reduce(beforeYesterdayDayDWD, yesterdayMinuteDWD, adDataOfMinuteODSList, statODS, lastReduceData, now);
         collector.collect(newAdStat);
         System.out.println("输出:" + JsonUtil.toString(newAdStat));
-        lastReduceMapping.put(reduceTimeKey, newAdStat);
-        historyMinuteMapping.put(statDay, newAdStat);
+        lastReduceState.update(newAdStat);
+        yesterdayMinuteState.put(statDay, newAdStat);
+        yesterdayMinuteState.remove(DateUtil.formatLocalDate(beginDate.minusDays(2L)));
+        yesterdayMinuteState.remove(DateUtil.formatLocalDate(beginDate.minusDays(3L)));
+        yesterdayMinuteState.remove(DateUtil.formatLocalDate(beginDate.minusDays(4L)));
     }
 
     @Override
     public void clear(ProcessWindowFunction<AdDataOfMinuteODS, AdStatOfMinuteDWD, Long, TimeWindow>.Context context) throws Exception {
         System.out.println("窗口关闭");
     }
-
-    private void clearData() {
-        lastAdDayQueryDay.entrySet().removeIf(entry -> DateUtil.parseLocalDate(entry.getValue()).compareTo(LocalDate.now().minusDays(3L)) <= 0);
-        historyDayData.entrySet().removeIf(entry -> DateUtil.parseLocalDate(entry.getValue().getStatDay()).compareTo(LocalDate.now().minusDays(50L)) <= 0);
-        historyMinuteData.forEach((key, value) -> {
-            if (value != null) {
-                value.entrySet().removeIf(entry2 -> DateUtil.parseLocalDate(entry2.getKey()).compareTo(LocalDate.now().minusDays(3L)) <= 0);
-            }
-        });
-        historyMinuteData.entrySet().removeIf(entry -> entry.getValue() == null || entry.getValue().isEmpty());
-
-        lastReduceData.forEach((key, value) -> {
-            if (value != null) {
-                value.entrySet().removeIf(entry -> DateUtil.parseOfMinuteReduce(entry.getKey()).compareTo(LocalDateTime.now().minusHours(1)) <= 0);
-            }
-        });
-        lastReduceData.entrySet().removeIf(entry -> entry.getValue() == null || entry.getValue().isEmpty());
-    }
 }

+ 4 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/sink/TunnelBatchStreamSink.java

@@ -1,6 +1,7 @@
 package flink.zanxiangnet.ad.monitoring.sink;
 
 import com.aliyun.odps.Odps;
+import com.aliyun.odps.PartitionSpec;
 import com.aliyun.odps.account.Account;
 import com.aliyun.odps.account.AliyunAccount;
 import com.aliyun.odps.data.Record;
@@ -10,6 +11,7 @@ import flink.zanxiangnet.ad.monitoring.maxcompute.MaxComputeLog;
 import flink.zanxiangnet.ad.monitoring.maxcompute.bean.BeanUtil;
 import flink.zanxiangnet.ad.monitoring.maxcompute.bean.annotation.MaxComputeTable;
 import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.slf4j.Logger;
@@ -84,7 +86,8 @@ public class TunnelBatchStreamSink<T, IN extends List<T>> extends RichSinkFuncti
     public void invoke(IN value, Context context) throws TunnelException, IOException, InvocationTargetException, IllegalAccessException {
         T element = value.get(0);
         String partitionStr = generatePartitionStr(element);
-        TableTunnel.StreamUploadSession uploadSession = tunnel.createStreamUploadSession(projectName, tableName, partitionStr);
+        System.out.println("[" + tableName + "]写入数据量:" + value.size() + "写入分区:" + partitionStr);
+        TableTunnel.StreamUploadSession uploadSession = tunnel.createStreamUploadSession(projectName, tableName, StringUtils.isBlank(partitionStr) ? null : new PartitionSpec(partitionStr), true);
         TableTunnel.StreamRecordPack pack = uploadSession.newRecordPack();
         for (T t : value) {
             Record record = uploadSession.newRecord();

+ 0 - 4
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/util/DateUtil.java

@@ -21,10 +21,6 @@ public class DateUtil {
         return dateTime.format(FORMAT_DATETIME);
     }
 
-    public static String formatOfMinuteReduce(LocalDateTime dateTime) {
-        return dateTime.format(FORMAT_MINUTE_REDUCE);
-    }
-
     public static LocalDateTime parseOfMinuteReduce(String reduceTime) {
         return LocalDateTime.parse(reduceTime, FORMAT_MINUTE_REDUCE);
     }