Kaynağa Gözat

增加清理逻辑

wcc 3 yıl önce
ebeveyn
işleme
9b766236b9
14 değiştirilmiş dosya ile 113 ekleme ve 122 silme
  1. 1 2
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdDayStreamJob.java
  2. 0 2
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdHourStatJob.java
  3. 0 2
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/PlanDayStreamJob.java
  4. 0 1
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/PlanHourStreamJob.java
  5. 4 5
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdDayDWDRollMonthProcess.java
  6. 4 7
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdDayDWDRollYearProcess.java
  7. 8 10
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdHourDWDProcess.java
  8. 1 1
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdMinuteDWDProcess.java
  9. 77 75
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostHourProcess.java
  10. 2 0
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostMinuteProcess.java
  11. 3 4
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/PlanDayDWDRollMonthProcess.java
  12. 6 8
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/PlanDayDWDRollYearProcess.java
  13. 6 4
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/PlanHourDWDProcess.java
  14. 1 1
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/PlanMinuteDWDProcess.java

+ 1 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdDayStreamJob.java

@@ -82,13 +82,12 @@ public class AdDayStreamJob {
         // 回滚 30天的数据计算
         SingleOutputStreamOperator<AdStatOfDayDWD> adDayDWDMonthStream = adDayODSStreamSplit.getSideOutput(adDayStreamRollDayTag)
                 .keyBy(AdDataOfDayODS::getAdId)
-                .countWindow(1)
                 .process(new AdDayDWDRollMonthProcess());
 
         // 单个账号回滚一年
         SingleOutputStreamOperator<AdStatOfDayDWD> adDayDWDYearStream = adDayODSStreamSplit.getSideOutput(adDayStreamRollYearTag)
                 .keyBy(AdDataOfDayODS::getAdId)
-                .countWindow(1).process(new AdDayDWDRollYearProcess());
+                .process(new AdDayDWDRollYearProcess());
 
         DataStream<AdStatOfDayDWD> adDayStream = adDayDWDMonthStream.union(adDayDWDYearStream);
         // 写入 maxCompute

+ 0 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdHourStatJob.java

@@ -110,7 +110,6 @@ public class AdHourStatJob {
         // 小时流-计算
         SingleOutputStreamOperator<AdStatOfHourDWD> adHourDWDStream =
                 adHourODSStream.keyBy(AdDataOfHourODS::getAdId)
-                        .countWindow(1)
                         .process(new AdHourDWDProcess());
 
         DataStream<AdStatOfHourDWD> adHourDWDAllStream = adMinuteDWDStream.getSideOutput(adHourFromMinuteStreamTag)
@@ -126,7 +125,6 @@ public class AdHourStatJob {
         SingleOutputStreamOperator<CostHourDM> clickhouseHourDmStream =
                 adHourDWDAllStream
                         .keyBy(AdStatOfHourDWD::getAdId)
-                        .countWindow(1)
                         .process(new CostHourProcess())
                         .name("sink_ad_hour_dm_clickhouse");
 

+ 0 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/PlanDayStreamJob.java

@@ -76,13 +76,11 @@ public class PlanDayStreamJob {
         // 每日回滚 30天
         SingleOutputStreamOperator<PlanStatOfDayDWD> planDayDWDMonthStream = adDayODSStreamSplit.getSideOutput(adDayStreamRollDayTag)
                 .keyBy(AdDataOfDayODS::getCampaignId)
-                .countWindow(1)
                 .process(new PlanDayDWDRollMonthProcess());
 
         // 往前回滚 365天
         SingleOutputStreamOperator<PlanStatOfDayDWD> planDayDWDYearStream = adDayODSStreamSplit.getSideOutput(adDayStreamRollYearTag)
                 .keyBy(AdDataOfDayODS::getCampaignId)
-                .countWindow(1)
                 .process(new PlanDayDWDRollYearProcess());
 
         DataStream<PlanStatOfDayDWD> planDayDWDStream = planDayDWDMonthStream.union(planDayDWDYearStream);

+ 0 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/PlanHourStreamJob.java

@@ -78,7 +78,6 @@ public class PlanHourStreamJob {
 
         // 小时流
         SingleOutputStreamOperator<PlanStatOfHourDWD> planHourDWDStream = adODSStream.getSideOutput(adHourStreamTag).keyBy(AdDataOfHourODS::getCampaignId)
-                .countWindow(1)
                 .process(new PlanHourDWDProcess());
         DataStream<PlanStatOfHourDWD> planHourDWDAllStream = planMinuteDWDStream.getSideOutput(planHourFromMinuteStreamTag).union(planHourDWDStream);
         new KeyedBatchStream<>("planHourDWDStream", planHourDWDAllStream.keyBy(PlanStatOfHourDWD::getStatDay), 4000L, 60 * 1000L)

+ 4 - 5
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdDayDWDRollMonthProcess.java

@@ -12,6 +12,7 @@ import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 import org.apache.flink.util.Collector;
@@ -28,7 +29,7 @@ import java.util.Properties;
 import java.util.stream.Collectors;
 
 @Slf4j
-public class AdDayDWDRollMonthProcess extends ProcessWindowFunction<AdDataOfDayODS, AdStatOfDayDWD, Long, GlobalWindow> {
+public class AdDayDWDRollMonthProcess extends KeyedProcessFunction<Long, AdDataOfDayODS, AdStatOfDayDWD> {
     private SqlSessionFactory sqlSessionFactory;
     // 上次查询的时间
     private ValueState<String> lastQueryDayState;
@@ -63,11 +64,9 @@ public class AdDayDWDRollMonthProcess extends ProcessWindowFunction<AdDataOfDayO
     }
 
     @Override
-    public void process(Long elementsCount, ProcessWindowFunction<AdDataOfDayODS, AdStatOfDayDWD, Long, GlobalWindow>.Context context,
-                        Iterable<AdDataOfDayODS> iterable, Collector<AdStatOfDayDWD> collector) throws Exception {
-        AdDataOfDayODS element = iterable.iterator().next();
+    public void processElement(AdDataOfDayODS element, KeyedProcessFunction<Long, AdDataOfDayODS, AdStatOfDayDWD>.Context context, Collector<AdStatOfDayDWD> collector) throws Exception {
         LocalDate statDay = DateUtil.parseLocalDate(element.getStatDay());
-        long now = context.currentProcessingTime();
+        long now = System.currentTimeMillis();
         long adId = element.getAdId();
 
         String lastQueryDay = lastQueryDayState.value();

+ 4 - 7
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdDayDWDRollYearProcess.java

@@ -6,12 +6,11 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
-import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 import org.apache.flink.util.Collector;
 
 @Slf4j
-public class AdDayDWDRollYearProcess extends ProcessWindowFunction<AdDataOfDayODS, AdStatOfDayDWD, Long, GlobalWindow> {
+public class AdDayDWDRollYearProcess extends KeyedProcessFunction<Long, AdDataOfDayODS, AdStatOfDayDWD> {
     // 上次聚合的结果
     private ValueState<AdStatOfDayDWD> lastReduceState;
 
@@ -20,11 +19,9 @@ public class AdDayDWDRollYearProcess extends ProcessWindowFunction<AdDataOfDayOD
     }
 
     @Override
-    public void process(Long elementsCount, ProcessWindowFunction<AdDataOfDayODS, AdStatOfDayDWD, Long, GlobalWindow>.Context context,
-                        Iterable<AdDataOfDayODS> elements, Collector<AdStatOfDayDWD> out) throws Exception {
-        AdDataOfDayODS element = elements.iterator().next();
+    public void processElement(AdDataOfDayODS element, KeyedProcessFunction<Long, AdDataOfDayODS, AdStatOfDayDWD>.Context context, Collector<AdStatOfDayDWD> collector) throws Exception {
         AdStatOfDayDWD newStatDWD = AdStatOfDayDWD.reduce(lastReduceState.value(), element, System.currentTimeMillis());
         lastReduceState.update(newStatDWD);
-        out.collect(newStatDWD);
+        collector.collect(newStatDWD);
     }
 }

+ 8 - 10
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdHourDWDProcess.java

@@ -14,8 +14,7 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
-import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 import org.apache.flink.util.Collector;
 import org.apache.ibatis.datasource.DataSourceFactory;
 import org.apache.ibatis.mapping.Environment;
@@ -25,8 +24,6 @@ import org.apache.ibatis.session.SqlSessionFactoryBuilder;
 import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory;
 
 import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.LocalTime;
 import java.util.Map;
 import java.util.Properties;
 import java.util.stream.Collectors;
@@ -35,7 +32,7 @@ import java.util.stream.Collectors;
  * 往前回滚 10天的小时数据(警告,此数据依赖小时数据必定在天数据之后拉取来实现的。不然会拉下昨天的数据没有统计进来)
  */
 @Slf4j
-public class AdHourDWDProcess extends ProcessWindowFunction<AdDataOfHourODS, AdStatOfHourDWD, Long, GlobalWindow> {
+public class AdHourDWDProcess extends KeyedProcessFunction<Long, AdDataOfHourODS, AdStatOfHourDWD> {
     private SqlSessionFactory sqlSessionFactory;
     // 上次查询的天数据
     private ValueState<String> lastQueryDayState;
@@ -73,11 +70,9 @@ public class AdHourDWDProcess extends ProcessWindowFunction<AdDataOfHourODS, AdS
     }
 
     @Override
-    public void process(Long elementsCount, ProcessWindowFunction<AdDataOfHourODS, AdStatOfHourDWD, Long, GlobalWindow>.Context context,
-                        Iterable<AdDataOfHourODS> iterable, Collector<AdStatOfHourDWD> collector) throws Exception {
-        AdDataOfHourODS element = iterable.iterator().next();
+    public void processElement(AdDataOfHourODS element, KeyedProcessFunction<Long, AdDataOfHourODS, AdStatOfHourDWD>.Context context, Collector<AdStatOfHourDWD> collector) throws Exception {
         LocalDate statDay = DateUtil.parseLocalDate(element.getStatDay());
-        long now = context.currentProcessingTime();
+        long now = System.currentTimeMillis();
         LocalDate today = LocalDate.now();
         long adId = element.getAdId();
 
@@ -86,7 +81,7 @@ public class AdHourDWDProcess extends ProcessWindowFunction<AdDataOfHourODS, AdS
         if (lastQueryDay == null || !lastQueryDay.equals(DateUtil.formatLocalDate(today))) {
             try (SqlSession session = sqlSessionFactory.openSession()) {
                 AdStatOfDayDWDMapper mapper = session.getMapper(AdStatOfDayDWDMapper.class);
-                Map<String, AdStatOfDayDWD> historyData = mapper.lastReduceResult(adId, null,  DateUtil.formatLocalDate(LocalDate.now().minusDays(1L)), 60).stream()
+                Map<String, AdStatOfDayDWD> historyData = mapper.lastReduceResult(adId, null, DateUtil.formatLocalDate(LocalDate.now().minusDays(1L)), 60).stream()
                         .collect(Collectors.toMap(AdStatOfDayDWD::getStatDay, data -> data, (val1, val2) -> val2));
                 historyReduceState.clear();
                 historyReduceState.putAll(historyData);
@@ -107,5 +102,8 @@ public class AdHourDWDProcess extends ProcessWindowFunction<AdDataOfHourODS, AdS
         AdStatOfHourDWD newStatData = AdStatOfHourDWD.reduce(yesterdayReduceData, lastReduce, element, now);
         lastReduceState.put(newStatData.getStatDay(), newStatData);
         collector.collect(newStatData);
+
+        // 往前清理 15天的数据
+        lastReduceState.remove(DateUtil.formatLocalDate(statDay.minusDays(15L)));
     }
 }

+ 1 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdMinuteDWDProcess.java

@@ -84,7 +84,7 @@ public class AdMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteODS,
         LocalDate beginDate = beginDateTime.toLocalDate();
         String statDay = DateUtil.formatLocalDate(beginDate);
         int hour = beginDateTime.getHour();
-        long now = context.currentProcessingTime();
+        long now = System.currentTimeMillis();
         AdDataOfMinuteODS statODS = null;
         List<AdDataOfMinuteODS> adDataOfMinuteODSList = new ArrayList<>(24);
         for (AdDataOfMinuteODS adDataOfMinuteODS : iterable) {

+ 77 - 75
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostHourProcess.java

@@ -1,38 +1,34 @@
 package flink.zanxiangnet.ad.monitoring.process;
 
-import flink.zanxiangnet.ad.monitoring.clickhouse.sink.ClickhouseUtil;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfHourDWD;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.CostHourDM;
+import flink.zanxiangnet.ad.monitoring.util.DateUtil;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
-import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 import org.apache.flink.util.Collector;
 
 import java.io.IOException;
-import java.sql.Connection;
-import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.sql.Statement;
 import java.text.SimpleDateFormat;
-import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
+import java.time.LocalDate;
+import java.util.*;
 
-public class CostHourProcess extends ProcessWindowFunction<AdStatOfHourDWD, CostHourDM, Long, GlobalWindow> {
-    private static final DateTimeFormatter formatForLastReduceKey = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH:mm");
-    private Connection connection = null;
+public class CostHourProcess extends KeyedProcessFunction<Long, AdStatOfHourDWD, CostHourDM> {
+
+    private MapState<String, Map<Integer, AdStatOfHourDWD>> historyReduceState;
 
 
     @Override
     public void open(Configuration conf) throws SQLException, ClassNotFoundException, IOException {
-        Properties props = new Properties();
-        props.load(CostHourProcess.class.getResourceAsStream("/application.properties"));
-        connection = ClickhouseUtil.getConn(props);
+        historyReduceState = getRuntimeContext().getMapState(new MapStateDescriptor<>("historyReduceState", Types.STRING, Types.MAP(Types.INT, Types.POJO(AdStatOfHourDWD.class))));
     }
 
     //数据格式转换
-    public CostHourDM datachange(AdStatOfHourDWD adStatOfMinuteDWD, CostHourDM costHourDM) {
+    public CostHourDM dataChange(AdStatOfHourDWD adStatOfMinuteDWD) {
+        CostHourDM costHourDM = new CostHourDM();
         //时间-天
         costHourDM.dt = adStatOfMinuteDWD.getStatDay();
         //计划 id
@@ -202,71 +198,77 @@ public class CostHourProcess extends ProcessWindowFunction<AdStatOfHourDWD, Cost
         return costHourDM;
     }
 
-
     @Override
-    public void process(Long elementCount, ProcessWindowFunction<AdStatOfHourDWD, CostHourDM, Long, GlobalWindow>.Context context,
-                        Iterable<AdStatOfHourDWD> iterable, Collector<CostHourDM> collector) throws Exception {
-
-        //获取前几分钟
-
-
-        List<AdStatOfHourDWD> adStatOfMinuteDWDlist = new ArrayList<>(24);
-
-        for (AdStatOfHourDWD adStatOfMinuteDWD : iterable) {
-            adStatOfMinuteDWDlist.add(adStatOfMinuteDWD);
-            CostHourDM costHourDM = new CostHourDM();
-
-
-            //当前天
-            String statDay = adStatOfMinuteDWD.getStatDay();
-            //当前小时
-            int hour = adStatOfMinuteDWD.getHour();
-            String tmpHour = "";
-            int tmpHourInt = adStatOfMinuteDWD.getHour() - 1;
-            tmpHour = tmpHourInt > 9 ? String.valueOf(tmpHourInt) : "0" + tmpHourInt;
-            String lastHour = adStatOfMinuteDWD.getStatDay() + " " + tmpHour + ":00:00";
-
-            tmpHourInt = adStatOfMinuteDWD.getHour() - 2;
-            tmpHour = tmpHourInt > 9 ? String.valueOf(tmpHourInt) : "0" + tmpHourInt;
-            String lastTwoHour = adStatOfMinuteDWD.getStatDay() + " " + tmpHour + ":00:00";
-
-            tmpHourInt = adStatOfMinuteDWD.getHour() - 3;
-            tmpHour = tmpHourInt > 9 ? String.valueOf(tmpHourInt) : "0" + tmpHourInt;
-            String lastThreeHour = adStatOfMinuteDWD.getStatDay() + " " + tmpHour + ":00:00";
-            //TODO:优化为本地拿到小时数据
-            String adId = adStatOfMinuteDWD.getAdId().toString();
-            String sql = "select " +
-                    "if(hour='" + lastHour + "',cost_hour,0) last_hour_cost, " +
-                    "if(hour='" + lastTwoHour + "',cost_hour,0) last_two_hour_cost, " +
-                    "if(hour='" + lastHour + "',cost_hour,0) - if(hour='" + lastTwoHour + "',cost_hour,0) cost_last_hour_diff, " +
-                    "(if(hour='" + lastHour + "',cost_hour,0) - if(hour='" + lastTwoHour + "',cost_hour,0))*(if(hour='" + lastTwoHour + "',cost_hour,0) - if(hour='" + lastThreeHour + "',cost_hour,0)) cost_last_three_trend " +
-                    "from data_monitoring.cost_hour ch " +
-                    "where dt='" + statDay + "' and ad_id='" + adId + "' ";
+    public void processElement(AdStatOfHourDWD adStatOfHourDWD, KeyedProcessFunction<Long, AdStatOfHourDWD, CostHourDM>.Context context,
+                               Collector<CostHourDM> collector) throws Exception {
+        LocalDate day = DateUtil.parseLocalDate(adStatOfHourDWD.getStatDay());
+        Integer hour = adStatOfHourDWD.getHour();
 
-//            System.out.println(sql);
-            Statement statement = connection.createStatement();
-            ResultSet rs = statement.executeQuery(sql);
-            while (rs.next()) {
-                costHourDM.costLastHour = rs.getLong(1);
-                costHourDM.costLastTwoHour = rs.getLong(2);
-                costHourDM.costLastHourDiff = rs.getLong(3);
-                costHourDM.costLastThreeTrend = rs.getLong(4);
+        LocalDate lastHourDay = day, lastTwoHourDay = day, lastThreeHourDay = day;
+        int lastHour = hour - 1, lastTwoHour = hour - 2, lastThreeHour = hour - 3;
+        if (hour == 0) {
+            lastHourDay = day.minusDays(1L);
+            lastHour = 23;
+            lastTwoHourDay = day.minusDays(1L);
+            lastTwoHour = 22;
+            lastThreeHourDay = day.minusDays(1L);
+            lastThreeHour = 21;
+        } else if (hour == 1) {
+            lastTwoHourDay = day.minusDays(1L);
+            lastTwoHour = 23;
+            lastThreeHourDay = day.minusDays(1L);
+            lastThreeHour = 22;
+        } else if (hour == 2) {
+            lastThreeHourDay = day.minusDays(1L);
+            lastThreeHour = 23;
+        }
+        long costDiff = 0L, costLastHour = 0L;
+        Map<Integer, AdStatOfHourDWD> lastHourMapping = historyReduceState.get(DateUtil.formatLocalDate(lastHourDay));
+        if (lastHourMapping != null && !lastHourMapping.isEmpty()) {
+            AdStatOfHourDWD lastHourDWD = lastHourMapping.get(lastHour);
+            if (lastHourDWD != null) {
+                costLastHour = lastHourDWD.getCostHour();
+                costDiff = adStatOfHourDWD.getCostHour() - lastHourDWD.getCostHour();
             }
+        }
+        long costLastHourDiff = 0, costLastTwoHour = 0;
+        Map<Integer, AdStatOfHourDWD> lastTwoHourMapping = historyReduceState.get(DateUtil.formatLocalDate(lastTwoHourDay));
+        if (lastTwoHourMapping != null && !lastTwoHourMapping.isEmpty()) {
+            AdStatOfHourDWD lastTwoHourDWD = lastTwoHourMapping.get(lastTwoHour);
+            if (lastTwoHourDWD != null) {
+                costLastTwoHour = lastTwoHourDWD.getCostHour();
+                costLastHourDiff = costLastHour - lastTwoHourDWD.getCostHour();
+            }
+        }
+        long costLastTwoHourDiff = 0, costLastThreeTrend = 0;
+        Map<Integer, AdStatOfHourDWD> lastThreeHourMapping = historyReduceState.get(DateUtil.formatLocalDate(lastThreeHourDay));
+        if (lastThreeHourMapping != null && !lastThreeHourMapping.isEmpty()) {
+            AdStatOfHourDWD lastThreeHourDWD = lastThreeHourMapping.get(lastThreeHour);
+            if (lastThreeHourDWD != null) {
+                costLastThreeTrend = lastThreeHourDWD.getCostHour();
+                costLastTwoHourDiff = costLastTwoHour - lastThreeHourDWD.getCostHour();
+            }
+        }
 
-            CostHourDM costHourDM_new = datachange(adStatOfMinuteDWD, costHourDM);
-
-            collector.collect(costHourDM_new);
-//            System.out.println("costhour_输入" + JsonUtil.toString(adStatOfMinuteDWD));
-//            System.out.println("costhour_输出:" + JsonUtil.toString(costHourDM_new));
+        if (historyReduceState.get(adStatOfHourDWD.getStatDay()) == null) {
+            Map<Integer, AdStatOfHourDWD> hourMapping = new HashMap<>(24);
+            hourMapping.put(adStatOfHourDWD.getHour(), adStatOfHourDWD);
+            historyReduceState.put(adStatOfHourDWD.getStatDay(), hourMapping);
+        } else {
+            historyReduceState.get(adStatOfHourDWD.getStatDay()).put(adStatOfHourDWD.getHour(), adStatOfHourDWD);
         }
-//        System.out.println("costhour_windowCount:" + adStatOfMinuteDWDlist.size());
 
+        CostHourDM costHourDM = dataChange(adStatOfHourDWD);
+        costHourDM.setCostDiff(costDiff);
+        costHourDM.setCostLastHour(costLastHour);
+        costHourDM.setCostLastHourDiff(costLastHourDiff);
+        costHourDM.setCostLastTwoHour(costLastTwoHour);
+        costHourDM.setCostLastTwoHourDiff(costLastTwoHourDiff);
+        costHourDM.setCostLastThreeTrend(costLastThreeTrend);
 
-    }
+        collector.collect(costHourDM);
+        historyReduceState.remove(DateUtil.formatLocalDate(day.minusDays(2L)));
+        historyReduceState.remove(DateUtil.formatLocalDate(day.minusDays(3L)));
 
-    @Override
-    public void clear(ProcessWindowFunction<AdStatOfHourDWD, CostHourDM, Long, GlobalWindow>.Context context) throws Exception {
-        System.out.println("窗口关闭");
     }
-
 }

+ 2 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostMinuteProcess.java

@@ -92,6 +92,8 @@ public class CostMinuteProcess extends KeyedProcessFunction<Long, AdStatOfMinute
         costMinuterDM.setCostLastThreeTrend(costLastThreeTrend);
 
         collector.collect(costMinuterDM);
+        historyReduceState.remove(DateUtil.formatLocalDate(day.minusDays(2L)));
+        historyReduceState.remove(DateUtil.formatLocalDate(day.minusDays(3L)));
     }
 
     //数据格式转换

+ 3 - 4
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/PlanDayDWDRollMonthProcess.java

@@ -12,6 +12,7 @@ import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 import org.apache.flink.util.Collector;
@@ -28,7 +29,7 @@ import java.util.Properties;
 import java.util.stream.Collectors;
 
 @Slf4j
-public class PlanDayDWDRollMonthProcess extends ProcessWindowFunction<AdDataOfDayODS, PlanStatOfDayDWD, Long, GlobalWindow> {
+public class PlanDayDWDRollMonthProcess extends KeyedProcessFunction<Long, AdDataOfDayODS, PlanStatOfDayDWD> {
     private SqlSessionFactory sqlSessionFactory;
     // 上次查询的时间
     private ValueState<String> lastQueryDayState;
@@ -63,9 +64,7 @@ public class PlanDayDWDRollMonthProcess extends ProcessWindowFunction<AdDataOfDa
     }
 
     @Override
-    public void process(Long elementsCount, ProcessWindowFunction<AdDataOfDayODS, PlanStatOfDayDWD, Long, GlobalWindow>.Context context,
-                        Iterable<AdDataOfDayODS> iterable, Collector<PlanStatOfDayDWD> collector) throws Exception {
-        AdDataOfDayODS element = iterable.iterator().next();
+    public void processElement(AdDataOfDayODS element, KeyedProcessFunction<Long, AdDataOfDayODS, PlanStatOfDayDWD>.Context context, Collector<PlanStatOfDayDWD> collector) throws Exception {
         LocalDate statDay = DateUtil.parseLocalDate(element.getStatDay());
         long now = System.currentTimeMillis();
         long adId = element.getAdId();

+ 6 - 8
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/PlanDayDWDRollYearProcess.java

@@ -6,13 +6,12 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
-import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 import org.apache.flink.util.Collector;
 
 @Slf4j
-public class PlanDayDWDRollYearProcess extends ProcessWindowFunction<AdDataOfDayODS, PlanStatOfDayDWD, Long, GlobalWindow> {
-    
+public class PlanDayDWDRollYearProcess extends KeyedProcessFunction<Long, AdDataOfDayODS, PlanStatOfDayDWD> {
+
     // 上次聚合的结果
     private ValueState<PlanStatOfDayDWD> lastReduceState;
 
@@ -21,11 +20,10 @@ public class PlanDayDWDRollYearProcess extends ProcessWindowFunction<AdDataOfDay
     }
 
     @Override
-    public void process(Long elementsCount, ProcessWindowFunction<AdDataOfDayODS, PlanStatOfDayDWD, Long, GlobalWindow>.Context context,
-                        Iterable<AdDataOfDayODS> elements, Collector<PlanStatOfDayDWD> out) throws Exception {
-        AdDataOfDayODS element = elements.iterator().next();
+    public void processElement(AdDataOfDayODS element, KeyedProcessFunction<Long, AdDataOfDayODS, PlanStatOfDayDWD>.Context context,
+                               Collector<PlanStatOfDayDWD> collector) throws Exception {
         PlanStatOfDayDWD newStatDWD = PlanStatOfDayDWD.reduce(lastReduceState.value(), element, System.currentTimeMillis());
-        out.collect(newStatDWD);
+        collector.collect(newStatDWD);
         lastReduceState.update(newStatDWD);
     }
 }

+ 6 - 4
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/PlanHourDWDProcess.java

@@ -14,6 +14,7 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 import org.apache.flink.util.Collector;
@@ -30,7 +31,7 @@ import java.util.Properties;
 import java.util.stream.Collectors;
 
 @Slf4j
-public class PlanHourDWDProcess extends ProcessWindowFunction<AdDataOfHourODS, PlanStatOfHourDWD, Long, GlobalWindow> {
+public class PlanHourDWDProcess extends KeyedProcessFunction<Long, AdDataOfHourODS, PlanStatOfHourDWD> {
 
     private SqlSessionFactory sqlSessionFactory;
     // 上次查询的天数据
@@ -69,9 +70,8 @@ public class PlanHourDWDProcess extends ProcessWindowFunction<AdDataOfHourODS, P
     }
 
     @Override
-    public void process(Long elementsCount, ProcessWindowFunction<AdDataOfHourODS, PlanStatOfHourDWD, Long, GlobalWindow>.Context context,
-                        Iterable<AdDataOfHourODS> iterable, Collector<PlanStatOfHourDWD> collector) throws Exception {
-        AdDataOfHourODS element = iterable.iterator().next();
+    public void processElement(AdDataOfHourODS element, KeyedProcessFunction<Long, AdDataOfHourODS, PlanStatOfHourDWD>.Context context,
+                               Collector<PlanStatOfHourDWD> collector) throws Exception {
         LocalDate statDay = DateUtil.parseLocalDate(element.getStatDay());
         long now = System.currentTimeMillis();
         long adId = element.getAdId();
@@ -104,5 +104,7 @@ public class PlanHourDWDProcess extends ProcessWindowFunction<AdDataOfHourODS, P
         PlanStatOfHourDWD newStatData = PlanStatOfHourDWD.reduce(yesterdayReduceData, lastReduceData, element, now);
         lastReduceState.put(newStatData.getStatDay(), newStatData);
         collector.collect(newStatData);
+
+        lastReduceState.remove(DateUtil.formatLocalDate(statDay.minusDays(15L)));
     }
 }

+ 1 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/PlanMinuteDWDProcess.java

@@ -85,7 +85,7 @@ public class PlanMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteOD
         LocalDate beginDate = beginDateTime.toLocalDate();
         String statDay = DateUtil.formatLocalDate(beginDate);
         int hour = beginDateTime.getHour();
-        long now = context.currentProcessingTime();
+        long now = System.currentTimeMillis();
         AdDataOfMinuteODS statODS = null;
         List<AdDataOfMinuteODS> adDataOfMinuteODSList = new ArrayList<>(24);
         for (AdDataOfMinuteODS adDataOfMinuteODS : iterable) {