Quellcode durchsuchen

优化小时数据计算

wcc vor 3 Jahren
Ursprung
Commit
bf649721e9

+ 2 - 3
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdHourStreamJob.java

@@ -70,8 +70,8 @@ public class AdHourStreamJob {
 
         // 分钟流-计算
         SingleOutputStreamOperator<AdStatOfMinuteDWD> adMinuteDWDStream = adMinuteODSStream
-                // 打水印,允许数据延迟 6分钟,同时指定时间流
-                .assignTimestampsAndWatermarks(WatermarkStrategy.<AdDataOfMinuteODS>forBoundedOutOfOrderness(Duration.ofMinutes(6L))
+                // 打水印,允许数据延迟 5分钟,同时指定时间流
+                .assignTimestampsAndWatermarks(WatermarkStrategy.<AdDataOfMinuteODS>forBoundedOutOfOrderness(Duration.ofMinutes(5L))
                         .withTimestampAssigner((SerializableTimestampAssigner<AdDataOfMinuteODS>) (adODS, l) -> adODS.getStatTime()))
                 .keyBy(AdDataOfMinuteODS::getAdId)
                 // 开一个 5分钟的滚动窗口
@@ -101,7 +101,6 @@ public class AdHourStreamJob {
                 .addSink(new TunnelBatchStreamSink<>(AdDataOfHourODS.class))
                 .name("sink_ad_hour_ods");
 
-
         // 小时流-计算
         SingleOutputStreamOperator<AdStatOfHourDWD> adHourDWDStream =
                 adHourODSStream.keyBy(AdDataOfHourODS::getAdId)

+ 0 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/clickhouse/sink/BatchSinkMinute.java

@@ -78,7 +78,6 @@ public class BatchSinkMinute extends RichSinkFunction<CostMinuterDM> {
                 "?,?,?,?,?,?,?,?,?,?,\n" +
                 "?,?,?,?,?,?,?,?,?,?,\n" +
                 "?,?,?,?,?,?,?,?,?,?,?\n)";
-//        log.error(costMinuterDM);
         PreparedStatement preparedStatement = connection.prepareStatement(sql);
         preparedStatement.setString(1, costMinuterDM.dt);
         preparedStatement.setString(2, costMinuterDM.minute);

+ 3 - 6
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdHourOnTimeStreamCompletionProcess.java

@@ -39,18 +39,15 @@ public class AdHourOnTimeStreamCompletionProcess extends KeyedProcessFunction<Lo
         if (lastStatDateTime.compareTo(statDateTime) <= 0) {
             long hours = DateUtil.intervalOfHour(lastStatDateTime, statDateTime);
             if (hours > 1) {
-                if (hours > 24) {
-                    log.error("实时:adId[{}] 要造 {}条数据, 从 {} 到 {}结束", adStatOfHourDWD.getAdId(), hours, lastStatDateTime, statDateTime);
-                }
                 long start = System.currentTimeMillis();
                 // 中间有没数据的时间段,需要进行数据填充
-                for (int i = 1; i < (hours >= 24 ? 24 : hours); i++) {
+                for (int i = 1; i < hours; i++) {
                     // 要填充的时间
                     LocalDateTime completionTime = lastStatDateTime.plusHours(i);
                     collector.collect(AdStatOfHourDWD.completion(completionTime, lastReduce));
                 }
-                if (hours > 24) {
-                    log.error("实时:给 adId[{}]造数据用时 {}", adStatOfHourDWD.getAdId(), (System.currentTimeMillis() - start));
+                if (hours > 12) {
+                    log.error("实时:给 adId[{}]造数据用时 {},总计造了:{},起始时间:{},截至时间;{}", adStatOfHourDWD.getAdId(), (System.currentTimeMillis() - start), hours - 1, lastStatDateTime, statDateTime);
                 }
             }
             lastReduceState.update(adStatOfHourDWD);

+ 40 - 16
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdMinuteDWDProcess.java

@@ -14,7 +14,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.OutputTag;
 import org.apache.ibatis.datasource.DataSourceFactory;
 import org.apache.ibatis.mapping.Environment;
 import org.apache.ibatis.session.SqlSession;
@@ -24,8 +23,10 @@ import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory;
 
 import java.time.LocalDate;
 import java.time.LocalDateTime;
+import java.time.LocalTime;
 import java.time.format.DateTimeFormatter;
 import java.util.*;
+import java.util.stream.Collectors;
 
 @Slf4j
 public class AdMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteODS, AdStatOfMinuteDWD, Long, TimeWindow> {
@@ -92,6 +93,7 @@ public class AdMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteODS,
             statODS = adDataOfMinuteODS;
         }
         if (statODS == null) {
+            log.error("分钟流窗口没有拿到整点的小时数据!!!!");
             return;
         }
 
@@ -120,16 +122,43 @@ public class AdMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteODS,
         lastReduceState.put(beginDateTime.format(formatForLastReduceKey), newAdStat);
         yesterdayMinuteState.put(statDay, newAdStat);
 
-        List<String> removeKeys = new ArrayList<>(10);
+        if (hour > 0 && (beginDateTime.getMinute() == 15 || beginDateTime.getMinute() == 30 || beginDateTime.getMinute() == 55)) {
+            // 重新输出一条上小时的数据
+            List<AdDataOfMinuteODS> lastHourODSList = adDataOfMinuteODSList.stream().filter(ods -> ods.getHour() < hour).collect(Collectors.toList());
+            AdDataOfMinuteODS lastHourODS = null;
+            for (AdDataOfMinuteODS ods : lastHourODSList) {
+                if (ods.getHour() == hour - 1) {
+                    lastHourODS = ods;
+                    break;
+                }
+            }
+            if (lastHourODS != null) {
+                LocalDateTime lastHourBeginTime = LocalDateTime.of(beginDate, LocalTime.of(lastHourODS.getHour(), 55, 0));
+                lastHourODS.setStatTime(DateUtil.localDateTimeToMilli(lastHourBeginTime));
+                AdStatOfMinuteDWD lastHourReduceData = lastReduceState.get(lastHourBeginTime.minusMinutes(5L).format(formatForLastReduceKey));
+                // 聚合当天的全部数据
+                AdStatOfMinuteDWD lastHourAdStat = AdStatOfMinuteDWD.reduce(beforeYesterdayDayDWD, yesterdayMinuteDWD, lastHourODSList, lastHourODS, lastHourReduceData, now);
+                collector.collect(lastHourAdStat);
+            }
+        }
+
+        clearState(beginDateTime);
+    }
+
+    @Override
+    public void clear(ProcessWindowFunction<AdDataOfMinuteODS, AdStatOfMinuteDWD, Long, TimeWindow>.Context context) throws Exception {
+    }
+
+    private void clearState(LocalDateTime beginDateTime) throws Exception {
+        List<String> removeKeys = new ArrayList<>(20);
         Iterator<Map.Entry<String, AdStatOfMinuteDWD>> lastIterator = lastReduceState.iterator();
+
+        String delLastReduceKey = beginDateTime.minusHours(2).format(formatForLastReduceKey);
         while (lastIterator.hasNext()) {
             Map.Entry<String, AdStatOfMinuteDWD> temp = lastIterator.next();
-            if (temp.getKey().equals(beginDateTime.format(formatForLastReduceKey))
-                    || temp.getKey().equals(beginDateTime.minusMinutes(5L).format(formatForLastReduceKey))
-                    || temp.getKey().equals(beginDateTime.minusMinutes(10L).format(formatForLastReduceKey))) {
-                continue;
+            if (temp.getKey().compareTo(delLastReduceKey) < 0) {
+                removeKeys.add(temp.getKey());
             }
-            removeKeys.add(temp.getKey());
         }
         if (!removeKeys.isEmpty()) {
             for (String key : removeKeys) {
@@ -138,14 +167,13 @@ public class AdMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteODS,
         }
         removeKeys.clear();
         Iterator<Map.Entry<String, AdStatOfMinuteDWD>> yesterdayIterator = yesterdayMinuteState.iterator();
+
+        String delYesterdayKey = DateUtil.formatLocalDate(beginDateTime.toLocalDate().minusDays(5));
         while (yesterdayIterator.hasNext()) {
             Map.Entry<String, AdStatOfMinuteDWD> temp = yesterdayIterator.next();
-            if (temp.getKey().equals(DateUtil.formatLocalDate(beginDate))
-                    || temp.getKey().equals(DateUtil.formatLocalDate(beginDate.minusDays(1L)))
-                    || temp.getKey().equals(DateUtil.formatLocalDate(beginDate.minusDays(2L)))) {
-                continue;
+            if (temp.getKey().compareTo(delYesterdayKey) <= 0) {
+                removeKeys.add(temp.getKey());
             }
-            removeKeys.add(temp.getKey());
         }
         if (!removeKeys.isEmpty()) {
             for (String key : removeKeys) {
@@ -153,8 +181,4 @@ public class AdMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteODS,
             }
         }
     }
-
-    @Override
-    public void clear(ProcessWindowFunction<AdDataOfMinuteODS, AdStatOfMinuteDWD, Long, TimeWindow>.Context context) throws Exception {
-    }
 }

+ 157 - 156
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostHourProcess.java

@@ -26,249 +26,250 @@ public class CostHourProcess extends KeyedProcessFunction<Long, AdStatOfHourDWD,
         historyReduceState = getRuntimeContext().getMapState(new MapStateDescriptor<>("historyReduceState", Types.STRING, Types.MAP(Types.INT, Types.POJO(AdStatOfHourDWD.class))));
     }
 
+    @Override
+    public void processElement(AdStatOfHourDWD adStatOfHourDWD, KeyedProcessFunction<Long, AdStatOfHourDWD, CostHourDM>.Context context,
+                               Collector<CostHourDM> collector) throws Exception {
+        LocalDate day = DateUtil.parseLocalDate(adStatOfHourDWD.getStatDay());
+        Integer hour = adStatOfHourDWD.getHour();
+
+        LocalDate lastHourDay = day, lastTwoHourDay = day, lastThreeHourDay = day;
+        int lastHour = hour - 1, lastTwoHour = hour - 2, lastThreeHour = hour - 3;
+        if (hour == 0) {
+            lastHourDay = day.minusDays(1L);
+            lastHour = 23;
+            lastTwoHourDay = day.minusDays(1L);
+            lastTwoHour = 22;
+            lastThreeHourDay = day.minusDays(1L);
+            lastThreeHour = 21;
+        } else if (hour == 1) {
+            lastTwoHourDay = day.minusDays(1L);
+            lastTwoHour = 23;
+            lastThreeHourDay = day.minusDays(1L);
+            lastThreeHour = 22;
+        } else if (hour == 2) {
+            lastThreeHourDay = day.minusDays(1L);
+            lastThreeHour = 23;
+        }
+        long costDiff = 0L, costLastHour = 0L;
+        Map<Integer, AdStatOfHourDWD> lastHourMapping = historyReduceState.get(DateUtil.formatLocalDate(lastHourDay));
+        if (lastHourMapping != null && !lastHourMapping.isEmpty()) {
+            AdStatOfHourDWD lastHourDWD = lastHourMapping.get(lastHour);
+            if (lastHourDWD != null) {
+                costLastHour = lastHourDWD.getCostHour();
+                costDiff = adStatOfHourDWD.getCostHour() - lastHourDWD.getCostHour();
+            }
+        }
+        long costLastHourDiff = 0, costLastTwoHour = 0;
+        Map<Integer, AdStatOfHourDWD> lastTwoHourMapping = historyReduceState.get(DateUtil.formatLocalDate(lastTwoHourDay));
+        if (lastTwoHourMapping != null && !lastTwoHourMapping.isEmpty()) {
+            AdStatOfHourDWD lastTwoHourDWD = lastTwoHourMapping.get(lastTwoHour);
+            if (lastTwoHourDWD != null) {
+                costLastTwoHour = lastTwoHourDWD.getCostHour();
+                costLastHourDiff = costLastHour - lastTwoHourDWD.getCostHour();
+            }
+        }
+        long costLastTwoHourDiff = 0, costLastThreeTrend = 0;
+        Map<Integer, AdStatOfHourDWD> lastThreeHourMapping = historyReduceState.get(DateUtil.formatLocalDate(lastThreeHourDay));
+        if (lastThreeHourMapping != null && !lastThreeHourMapping.isEmpty()) {
+            AdStatOfHourDWD lastThreeHourDWD = lastThreeHourMapping.get(lastThreeHour);
+            if (lastThreeHourDWD != null) {
+                costLastThreeTrend = costLastHourDiff > 0 && costDiff > 0 ? 1 : 0;
+                costLastTwoHourDiff = costLastTwoHour - lastThreeHourDWD.getCostHour();
+            }
+        }
+
+        if (historyReduceState.get(adStatOfHourDWD.getStatDay()) == null) {
+            Map<Integer, AdStatOfHourDWD> hourMapping = new HashMap<>(24);
+            hourMapping.put(adStatOfHourDWD.getHour(), adStatOfHourDWD);
+            historyReduceState.put(adStatOfHourDWD.getStatDay(), hourMapping);
+        } else {
+            historyReduceState.get(adStatOfHourDWD.getStatDay()).put(adStatOfHourDWD.getHour(), adStatOfHourDWD);
+        }
+
+        CostHourDM costHourDM = dataChange(adStatOfHourDWD);
+        costHourDM.setCostDiff(costDiff);
+        costHourDM.setCostLastHour(costLastHour);
+        costHourDM.setCostLastHourDiff(costLastHourDiff);
+        costHourDM.setCostLastTwoHour(costLastTwoHour);
+        costHourDM.setCostLastTwoHourDiff(costLastTwoHourDiff);
+        costHourDM.setCostLastThreeTrend(costLastThreeTrend);
+
+        collector.collect(costHourDM);
+        historyReduceState.remove(DateUtil.formatLocalDate(day.minusDays(2L)));
+        historyReduceState.remove(DateUtil.formatLocalDate(day.minusDays(3L)));
+
+    }
+
     //数据格式转换
     public CostHourDM dataChange(AdStatOfHourDWD adStatOfMinuteDWD) {
         CostHourDM costHourDM = new CostHourDM();
+
         //时间-天
-        costHourDM.dt = adStatOfMinuteDWD.getStatDay();
+        costHourDM.setDt(adStatOfMinuteDWD.getStatDay());
         //计划 id
-        costHourDM.campaignId = adStatOfMinuteDWD.getCampaignId().toString();
+        costHourDM.setCampaignId(adStatOfMinuteDWD.getCampaignId().toString());
         //时间- real
-        costHourDM.createTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(adStatOfMinuteDWD.getCreateTime());
+        costHourDM.setCreateTime(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(adStatOfMinuteDWD.getCreateTime()));
         //时间-小时
         //TODO:之后需要进一步修改
         String tmpHour = adStatOfMinuteDWD.getHour() > 9 ? adStatOfMinuteDWD.getHour().toString() : "0" + adStatOfMinuteDWD.getHour().toString();
-        costHourDM.hour = adStatOfMinuteDWD.getStatDay() + " " + tmpHour + ":00:00";
+        costHourDM.setHour(adStatOfMinuteDWD.getStatDay() + " " + tmpHour + ":00:00");
         //广告id
-        costHourDM.adId = adStatOfMinuteDWD.getAdId().toString();
+        costHourDM.setAdId(adStatOfMinuteDWD.getAdId().toString());
         //服务商id
-        costHourDM.agencyAccountId = adStatOfMinuteDWD.getAgencyAccountId().toString();
+        costHourDM.setAgencyAccountId(adStatOfMinuteDWD.getAgencyAccountId().toString());
         //广告组id
-        costHourDM.adgroupId = adStatOfMinuteDWD.getAdgroupId().toString();
+        costHourDM.setAdgroupId(adStatOfMinuteDWD.getAdgroupId().toString());
         //创意id
-        costHourDM.adcreativeId = "";
+        costHourDM.setAdcreativeId("");
         //账号id
-        costHourDM.accountId = adStatOfMinuteDWD.getAccountId().toString();
+        costHourDM.setAccountId(adStatOfMinuteDWD.getAccountId().toString());
         //总消耗
-        costHourDM.costTotal = adStatOfMinuteDWD.getCostTotal();
+        costHourDM.setCostTotal(adStatOfMinuteDWD.getCostTotal());
         //当天消耗
-        costHourDM.costDay = adStatOfMinuteDWD.getCostDay();
+        costHourDM.setCostDay(adStatOfMinuteDWD.getCostDay());
         //当天小时消耗
-        costHourDM.costHour = adStatOfMinuteDWD.getCostHour();
+        costHourDM.setCostHour(adStatOfMinuteDWD.getCostHour());
         //消耗速度
-        costHourDM.costSpeed = adStatOfMinuteDWD.getCostHour();
+        costHourDM.setCostSpeed(adStatOfMinuteDWD.getCostHour());
         //总浏览量
-        costHourDM.viewCountTotal = adStatOfMinuteDWD.getViewCountTotal();
+        costHourDM.setViewCountTotal(adStatOfMinuteDWD.getViewCountTotal());
         //天-总浏览量
-        costHourDM.viewCountDay = adStatOfMinuteDWD.getViewCountDay();
+        costHourDM.setViewCountDay(adStatOfMinuteDWD.getViewCountDay());
         //小时-总浏览量
-        costHourDM.viewCountHour = adStatOfMinuteDWD.getViewCountHour();
+        costHourDM.setViewCountHour(adStatOfMinuteDWD.getViewCountHour());
         //总平均千次曝光成本
-        costHourDM.thousandDisplayPriceAll = adStatOfMinuteDWD.getThousandDisplayPriceAll();
+        costHourDM.setThousandDisplayPriceAll(adStatOfMinuteDWD.getThousandDisplayPriceAll());
         //天-总平均曝光成本
-        costHourDM.thousandDisplayPriceDay = adStatOfMinuteDWD.getThousandDisplayPriceDay();
+        costHourDM.setThousandDisplayPriceDay(adStatOfMinuteDWD.getThousandDisplayPriceDay());
         //小时-总平均曝光成本
-        costHourDM.thousandDisplayPriceHour = adStatOfMinuteDWD.getThousandDisplayPriceHour();
+        costHourDM.setThousandDisplayPriceHour(adStatOfMinuteDWD.getThousandDisplayPriceHour());
         //总点击量
-        costHourDM.validClickCountTotal = adStatOfMinuteDWD.getValidClickCountTotal();
+        costHourDM.setValidClickCountTotal(adStatOfMinuteDWD.getValidClickCountTotal());
         //天-总点击量
-        costHourDM.validClickCountDay = adStatOfMinuteDWD.getValidClickCountDay();
+        costHourDM.setValidClickCountDay(adStatOfMinuteDWD.getValidClickCountDay());
         //小时-总点击量
-        costHourDM.validClickCountHour = adStatOfMinuteDWD.getValidClickCountHour();
+        costHourDM.setValidClickCountHour(adStatOfMinuteDWD.getValidClickCountHour());
         //总平均点击率
-        costHourDM.ctrAll = adStatOfMinuteDWD.getCtrAll();
+        costHourDM.setCtrAll(adStatOfMinuteDWD.getCtrAll());
         //天-总平均点击率
-        costHourDM.ctrDay = adStatOfMinuteDWD.getCtrDay();
+        costHourDM.setCtrDay(adStatOfMinuteDWD.getCtrDay());
         //小时-总平均点击率
-        costHourDM.ctrHour = adStatOfMinuteDWD.getCtrHour();
+        costHourDM.setCtrHour(adStatOfMinuteDWD.getCtrHour());
         //总点击均价
-        costHourDM.cpcAll = adStatOfMinuteDWD.getCpcAll();
+        costHourDM.setCpcAll(adStatOfMinuteDWD.getCpcAll());
         //天-总点击均价
-        costHourDM.cpcDay = adStatOfMinuteDWD.getCpcDay();
+        costHourDM.setCpcDay(adStatOfMinuteDWD.getCpcDay());
         //小时-总点击均价
-        costHourDM.cpcHour = adStatOfMinuteDWD.getCpcHour();
+        costHourDM.setCpcHour(adStatOfMinuteDWD.getCpcHour());
         //总目标转化量
-        costHourDM.conversionsCountTotal = adStatOfMinuteDWD.getConversionsCountTotal();
+        costHourDM.setConversionsCountTotal(adStatOfMinuteDWD.getConversionsCountTotal());
         //天-总目标转化量
-        costHourDM.conversionsCountDay = adStatOfMinuteDWD.getConversionsCountDay();
+        costHourDM.setConversionsCountDay(adStatOfMinuteDWD.getConversionsCountDay());
         //小时-总目标转化量
-        costHourDM.conversionsCountHour = adStatOfMinuteDWD.getConversionsCountHour();
+        costHourDM.setConversionsCountHour(adStatOfMinuteDWD.getConversionsCountHour());
         //总目标平均转化成本
-        costHourDM.conversionsCostTotal = adStatOfMinuteDWD.getConversionsCostAll();
+        costHourDM.setConversionsCostTotal(adStatOfMinuteDWD.getConversionsCostAll());
         //天-总目标平均转化成本
-        costHourDM.conversionsCostDay = adStatOfMinuteDWD.getConversionsCostDay();
+        costHourDM.setConversionsCostDay(adStatOfMinuteDWD.getConversionsCostDay());
         //小时-总目标平均转化成本
-        costHourDM.conversionsCostHour = adStatOfMinuteDWD.getConversionsCostHour();
+        costHourDM.setConversionsCostHour(adStatOfMinuteDWD.getConversionsCostHour());
         //总平均转化率
-        costHourDM.conversionsRateAll = adStatOfMinuteDWD.getConversionsRateAll();
+        costHourDM.setConversionsRateAll(adStatOfMinuteDWD.getConversionsRateAll());
         //天-总平均转化率
-        costHourDM.conversionsRateDay = adStatOfMinuteDWD.getConversionsRateDay();
+        costHourDM.setConversionsRateDay(adStatOfMinuteDWD.getConversionsRateDay());
         //小时-总平均转化率
-        costHourDM.conversionsRateHour = adStatOfMinuteDWD.getConversionsRateHour();
+        costHourDM.setConversionsRateHour(adStatOfMinuteDWD.getConversionsRateHour());
         //TODO:总首日下单roi
-        costHourDM.firstDayOrderRoiTotal = 0;
+        costHourDM.setFirstDayOrderRoiTotal(0);
         //天-总首日下单roi
-        costHourDM.firstDayOrderRoiDay = 0;
+        costHourDM.setFirstDayOrderRoiDay(0);
         //小时-总首日下单roi
-        costHourDM.firstDayOrderRoiHour = 0;
+        costHourDM.setFirstDayOrderRoiHour(0);
         //总首日下单金额
-        costHourDM.firstDayOrderAmountTotal = adStatOfMinuteDWD.getFirstDayOrderAmountTotal();
+        costHourDM.setFirstDayOrderAmountTotal(adStatOfMinuteDWD.getFirstDayOrderAmountTotal());
         //天-总首日下单金额
-        costHourDM.firstDayOrderAmountDay = adStatOfMinuteDWD.getFirstDayOrderAmountDay();
+        costHourDM.setFirstDayOrderAmountDay(adStatOfMinuteDWD.getFirstDayOrderAmountDay());
         //小时-总首日下单金额
-        costHourDM.firstDayOrderAmountHour = adStatOfMinuteDWD.getFirstDayOrderAmountHour();
+        costHourDM.setFirstDayOrderAmountHour(adStatOfMinuteDWD.getFirstDayOrderAmountHour());
         //总首日下单量
-        costHourDM.firstDayOrderCountTotal = adStatOfMinuteDWD.getFirstDayOrderCountTotal();
+        costHourDM.setFirstDayOrderCountTotal(adStatOfMinuteDWD.getFirstDayOrderCountTotal());
         //天-总首日下单量
-        costHourDM.firstDayOrderCountDay = adStatOfMinuteDWD.getFirstDayOrderCountDay();
+        costHourDM.setFirstDayOrderCountDay(adStatOfMinuteDWD.getFirstDayOrderCountDay());
         //小时-总首日下单量
-        costHourDM.firstDayOrderCountHour = adStatOfMinuteDWD.getFirstDayOrderCountHour();
+        costHourDM.setFirstDayOrderCountHour(adStatOfMinuteDWD.getFirstDayOrderCountHour());
         //总下单金额
-        costHourDM.webOrderAmountTotal = adStatOfMinuteDWD.getOrderAmountTotal();
+        costHourDM.setWebOrderAmountTotal(adStatOfMinuteDWD.getOrderAmountTotal());
         //天-总下单金额
-        costHourDM.webOrderAmountDay = adStatOfMinuteDWD.getOrderAmountDay();
+        costHourDM.setWebOrderAmountDay(adStatOfMinuteDWD.getOrderAmountDay());
         //小时-总下单金额
-        costHourDM.webOrderAmountHour = adStatOfMinuteDWD.getOrderAmountHour();
+        costHourDM.setWebOrderAmountHour(adStatOfMinuteDWD.getOrderAmountHour());
         //总平均下单成本
-        costHourDM.webOrderCostTotal = adStatOfMinuteDWD.getWebOrderCostAll();
+        costHourDM.setWebOrderCostTotal(adStatOfMinuteDWD.getWebOrderCostAll());
         //天-总平均下单成本
-        costHourDM.webOrderCostDay = adStatOfMinuteDWD.getWebOrderCostDay();
+        costHourDM.setWebOrderCostDay(adStatOfMinuteDWD.getWebOrderCostDay());
         //小时-总平均下单成本
-        costHourDM.webOrderCostHour = adStatOfMinuteDWD.getWebOrderCostHour();
+        costHourDM.setWebOrderCostHour(adStatOfMinuteDWD.getWebOrderCostHour());
         //总平均下单率
-        costHourDM.webOrderRateTotal = adStatOfMinuteDWD.getOrderRateAll();
+        costHourDM.setWebOrderRateTotal(adStatOfMinuteDWD.getOrderRateAll());
         //天-总平均下单率
-        costHourDM.webOrderRateDay = adStatOfMinuteDWD.getOrderRateDay();
+        costHourDM.setWebOrderRateDay(adStatOfMinuteDWD.getOrderRateDay());
         //小时-总平均下单率
-        costHourDM.webOrderRateHour = adStatOfMinuteDWD.getOrderRateHour();
+        costHourDM.setWebOrderRateHour(adStatOfMinuteDWD.getOrderRateHour());
         //TODO:总平均下单量-----webordercount和ordercount是同一个东西吗
-        costHourDM.webOrderCountTotal = adStatOfMinuteDWD.getOrderCountTotal();
+        costHourDM.setWebOrderCountTotal(adStatOfMinuteDWD.getOrderCountTotal());
         //天-总平均下单量
-        costHourDM.webOrderCountDay = adStatOfMinuteDWD.getOrderCountDay();
+        costHourDM.setWebOrderCountDay(adStatOfMinuteDWD.getOrderCountDay());
         //小时-总平均下单量
-        costHourDM.webOrderCountHour = adStatOfMinuteDWD.getOrderCountHour();
+        costHourDM.setWebOrderCountHour(adStatOfMinuteDWD.getOrderCountHour());
         //总下单ROI
-        costHourDM.orderRoiTotal = adStatOfMinuteDWD.getOrderRoiAll();
+        costHourDM.setOrderRoiTotal(adStatOfMinuteDWD.getOrderRoiAll());
         //天-总下单roi
-        costHourDM.orderRoiDay = adStatOfMinuteDWD.getOrderRoiDay();
+        costHourDM.setOrderRoiDay(adStatOfMinuteDWD.getOrderRoiDay());
         //小时-总下单roi
-        costHourDM.orderRoiHour = adStatOfMinuteDWD.getOrderRoiHour();
+        costHourDM.setOrderRoiHour(adStatOfMinuteDWD.getOrderRoiHour());
         //总平均下单客单价
-        costHourDM.orderUnitPriceTotal = adStatOfMinuteDWD.getOrderUnitPriceAll();
+        costHourDM.setOrderUnitPriceTotal(adStatOfMinuteDWD.getOrderUnitPriceAll());
         //天-总平均下单客单价
-        costHourDM.orderUnitPriceDay = adStatOfMinuteDWD.getOrderUnitPriceDay();
+        costHourDM.setOrderUnitPriceDay(adStatOfMinuteDWD.getOrderUnitPriceDay());
         //小时-总平均下单客单价
-        costHourDM.orderUnitPriceHour = adStatOfMinuteDWD.getOrderUnitPriceHour();
+        costHourDM.setOrderUnitPriceHour(adStatOfMinuteDWD.getOrderUnitPriceHour());
         //总公众号关注量
-        costHourDM.fromFollowUvTotal = adStatOfMinuteDWD.getOfficialAccountFollowCountTotal();
+        costHourDM.setFromFollowUvTotal(adStatOfMinuteDWD.getOfficialAccountFollowCountTotal());
         //天-总公众号关注量
-        costHourDM.fromFollowUvDay = adStatOfMinuteDWD.getOfficialAccountFollowCountDay();
+        costHourDM.setFromFollowUvDay(adStatOfMinuteDWD.getOfficialAccountFollowCountDay());
         //小时-总公众号关注量
-        costHourDM.fromFollowUvHour = adStatOfMinuteDWD.getOfficialAccountFollowCountHour();
+        costHourDM.setFromFollowUvHour(adStatOfMinuteDWD.getOfficialAccountFollowCountHour());
         //TODO:总平均公众号关注成本---是否是价格/关注
-        costHourDM.fromFollowCostTotal = adStatOfMinuteDWD.getOfficialAccountFollowCountTotal() == 0 ? 0 : adStatOfMinuteDWD.getCostTotal() / adStatOfMinuteDWD.getOfficialAccountFollowCountTotal();
+        costHourDM.setFromFollowCostTotal(adStatOfMinuteDWD.getOfficialAccountFollowCountTotal() == 0 ? 0 : adStatOfMinuteDWD.getCostTotal() / adStatOfMinuteDWD.getOfficialAccountFollowCountTotal());
         //天-总平均公众号关注成本
-        costHourDM.fromFollowCostDay = adStatOfMinuteDWD.getOfficialAccountFollowCountDay() == 0 ? 0 : adStatOfMinuteDWD.getCostDay() / adStatOfMinuteDWD.getOfficialAccountFollowCountDay();
+        costHourDM.setFromFollowCostDay(adStatOfMinuteDWD.getOfficialAccountFollowCountDay() == 0 ? 0 : adStatOfMinuteDWD.getCostDay() / adStatOfMinuteDWD.getOfficialAccountFollowCountDay());
         //小时-总平均公众号关注成本
-        costHourDM.fromFollowCostHour = adStatOfMinuteDWD.getOfficialAccountFollowCountHour() == 0 ? 0 : adStatOfMinuteDWD.getCostHour() / adStatOfMinuteDWD.getOfficialAccountFollowCountHour();
+        costHourDM.setFromFollowCostHour(adStatOfMinuteDWD.getOfficialAccountFollowCountHour() == 0 ? 0 : adStatOfMinuteDWD.getCostHour() / adStatOfMinuteDWD.getOfficialAccountFollowCountHour());
         //TODO:总平均公众号关注率----确认是否对应
-        costHourDM.fromFollowRateTotal = adStatOfMinuteDWD.getOfficialAccountFollowRateAll();
+        costHourDM.setFromFollowRateTotal(adStatOfMinuteDWD.getOfficialAccountFollowRateAll());
         //天-总平均公众号关注率
-        costHourDM.fromFollowRateDay = adStatOfMinuteDWD.getOfficialAccountFollowRateDay();
+        costHourDM.setFromFollowRateDay(adStatOfMinuteDWD.getOfficialAccountFollowRateDay());
         //小时-总平均公众号关注率
-        costHourDM.fromFollowRateHour = adStatOfMinuteDWD.getOfficialAccountFollowRateHour();
+        costHourDM.setFromFollowRateHour(adStatOfMinuteDWD.getOfficialAccountFollowRateHour());
         //TODO:总注册数-----下面全是有问题的
-        costHourDM.webRegisterCountTotal = adStatOfMinuteDWD.getOfficialAccountRegisterUserCountTotal();
+        costHourDM.setWebRegisterCountTotal(adStatOfMinuteDWD.getOfficialAccountRegisterUserCountTotal());
         //天-总注册数
-        costHourDM.webRegisterCountDay = adStatOfMinuteDWD.getOfficialAccountRegisterUserCountDay();
+        costHourDM.setWebRegisterCountDay(adStatOfMinuteDWD.getOfficialAccountRegisterUserCountDay());
         //小时-总注册数
-        costHourDM.webRegisterCountHour = adStatOfMinuteDWD.getOfficialAccountRegisterUserCountHour();
+        costHourDM.setWebRegisterCountHour(adStatOfMinuteDWD.getOfficialAccountRegisterUserCountHour());
         //总注册人数
-        costHourDM.webRegisterUvTotal = adStatOfMinuteDWD.getOfficialAccountRegisterUserCountTotal();
+        costHourDM.setWebRegisterUvTotal(adStatOfMinuteDWD.getOfficialAccountRegisterUserCountTotal());
         //天-总注册人数
-        costHourDM.webRegisterUvDay = adStatOfMinuteDWD.getOfficialAccountRegisterUserCountDay();
+        costHourDM.setWebRegisterUvDay(adStatOfMinuteDWD.getOfficialAccountRegisterUserCountDay());
         //小时-总注册人数
-        costHourDM.webRegisterUvHour = adStatOfMinuteDWD.getOfficialAccountRegisterUserCountHour();
+        costHourDM.setWebRegisterUvHour(adStatOfMinuteDWD.getOfficialAccountRegisterUserCountHour());
         //总平均注册成本
-        costHourDM.webRegisterCostTotal = adStatOfMinuteDWD.getOfficialAccountRegisterCostAll();
+        costHourDM.setWebRegisterCostTotal(adStatOfMinuteDWD.getOfficialAccountRegisterCostAll());
         //天-总平均注册成本
-        costHourDM.webRegisterCostDay = adStatOfMinuteDWD.getOfficialAccountRegisterCostDay();
+        costHourDM.setWebRegisterCostDay(adStatOfMinuteDWD.getOfficialAccountRegisterCostDay());
         //小时-总平均注册成本
-        costHourDM.webRegisterCostHour = adStatOfMinuteDWD.getOfficialAccountRegisterCostHour();
+        costHourDM.setWebRegisterCostHour(adStatOfMinuteDWD.getOfficialAccountRegisterCostHour());
         return costHourDM;
     }
-
-    @Override
-    public void processElement(AdStatOfHourDWD adStatOfHourDWD, KeyedProcessFunction<Long, AdStatOfHourDWD, CostHourDM>.Context context,
-                               Collector<CostHourDM> collector) throws Exception {
-        LocalDate day = DateUtil.parseLocalDate(adStatOfHourDWD.getStatDay());
-        Integer hour = adStatOfHourDWD.getHour();
-
-        LocalDate lastHourDay = day, lastTwoHourDay = day, lastThreeHourDay = day;
-        int lastHour = hour - 1, lastTwoHour = hour - 2, lastThreeHour = hour - 3;
-        if (hour == 0) {
-            lastHourDay = day.minusDays(1L);
-            lastHour = 23;
-            lastTwoHourDay = day.minusDays(1L);
-            lastTwoHour = 22;
-            lastThreeHourDay = day.minusDays(1L);
-            lastThreeHour = 21;
-        } else if (hour == 1) {
-            lastTwoHourDay = day.minusDays(1L);
-            lastTwoHour = 23;
-            lastThreeHourDay = day.minusDays(1L);
-            lastThreeHour = 22;
-        } else if (hour == 2) {
-            lastThreeHourDay = day.minusDays(1L);
-            lastThreeHour = 23;
-        }
-        long costDiff = 0L, costLastHour = 0L;
-        Map<Integer, AdStatOfHourDWD> lastHourMapping = historyReduceState.get(DateUtil.formatLocalDate(lastHourDay));
-        if (lastHourMapping != null && !lastHourMapping.isEmpty()) {
-            AdStatOfHourDWD lastHourDWD = lastHourMapping.get(lastHour);
-            if (lastHourDWD != null) {
-                costLastHour = lastHourDWD.getCostHour();
-                costDiff = adStatOfHourDWD.getCostHour() - lastHourDWD.getCostHour();
-            }
-        }
-        long costLastHourDiff = 0, costLastTwoHour = 0;
-        Map<Integer, AdStatOfHourDWD> lastTwoHourMapping = historyReduceState.get(DateUtil.formatLocalDate(lastTwoHourDay));
-        if (lastTwoHourMapping != null && !lastTwoHourMapping.isEmpty()) {
-            AdStatOfHourDWD lastTwoHourDWD = lastTwoHourMapping.get(lastTwoHour);
-            if (lastTwoHourDWD != null) {
-                costLastTwoHour = lastTwoHourDWD.getCostHour();
-                costLastHourDiff = costLastHour - lastTwoHourDWD.getCostHour();
-            }
-        }
-        long costLastTwoHourDiff = 0, costLastThreeTrend = 0;
-        Map<Integer, AdStatOfHourDWD> lastThreeHourMapping = historyReduceState.get(DateUtil.formatLocalDate(lastThreeHourDay));
-        if (lastThreeHourMapping != null && !lastThreeHourMapping.isEmpty()) {
-            AdStatOfHourDWD lastThreeHourDWD = lastThreeHourMapping.get(lastThreeHour);
-            if (lastThreeHourDWD != null) {
-                costLastThreeTrend = costLastHourDiff > 0 && costDiff > 0 ? 1 : 0;
-                costLastTwoHourDiff = costLastTwoHour - lastThreeHourDWD.getCostHour();
-            }
-        }
-
-        if (historyReduceState.get(adStatOfHourDWD.getStatDay()) == null) {
-            Map<Integer, AdStatOfHourDWD> hourMapping = new HashMap<>(24);
-            hourMapping.put(adStatOfHourDWD.getHour(), adStatOfHourDWD);
-            historyReduceState.put(adStatOfHourDWD.getStatDay(), hourMapping);
-        } else {
-            historyReduceState.get(adStatOfHourDWD.getStatDay()).put(adStatOfHourDWD.getHour(), adStatOfHourDWD);
-        }
-
-        CostHourDM costHourDM = dataChange(adStatOfHourDWD);
-        costHourDM.setCostDiff(costDiff);
-        costHourDM.setCostLastHour(costLastHour);
-        costHourDM.setCostLastHourDiff(costLastHourDiff);
-        costHourDM.setCostLastTwoHour(costLastTwoHour);
-        costHourDM.setCostLastTwoHourDiff(costLastTwoHourDiff);
-        costHourDM.setCostLastThreeTrend(costLastThreeTrend);
-
-        collector.collect(costHourDM);
-        historyReduceState.remove(DateUtil.formatLocalDate(day.minusDays(2L)));
-        historyReduceState.remove(DateUtil.formatLocalDate(day.minusDays(3L)));
-
-    }
 }