wcc %!s(int64=3) %!d(string=hai) anos
pai
achega
9e19b0aaeb

+ 8 - 17
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdHourStatJob.java

@@ -18,8 +18,10 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
 
 import java.time.Duration;
@@ -90,28 +92,12 @@ public class AdHourStatJob {
         SingleOutputStreamOperator<CostMinuterDM> clickhouseMinuteDmStream =
                 adMinuteDWDStream
                         .keyBy(AdStatOfMinuteDWD::getAdId)
-                        .window(TumblingEventTimeWindows.of(Time.minutes(5L)))
-                        .trigger(new CostMinuteDMStreamTrigger())
                         .process(new CostMinuteProcess())
                         .name("sink_ad_minute_dm_clickhouse");
 
         BatchSinkMinute batchSinkMinute = new BatchSinkMinute();
         clickhouseMinuteDmStream.addSink(batchSinkMinute);
 
-
-        //cost----小时数据处理
-        /*SingleOutputStreamOperator<CostHourDM> clickhouseMinuteHourDmStream =
-                adMinuteDWDStream
-                        .keyBy(AdStatOfMinuteDWD::getAdId)
-                        .window(TumblingEventTimeWindows.of(Time.minutes(5L)))
-                        .trigger(new CostMinuteDMStreamTrigger())
-                        .process(new CostMinuteHourProcess())
-                        .name("sink_ad_minute_hour_dm_clickhouse");
-
-        BatchSinkHour batchSinkMinuteHour = new BatchSinkHour();
-        clickhouseMinuteHourDmStream.addSink(batchSinkMinuteHour);*/
-
-
         // 小时流
         DataStream<AdDataOfHourODS> adHourODSStream = adODSStream.getSideOutput(adHourStreamTag);
         // 小时流-写入原始表
@@ -120,12 +106,17 @@ public class AdHourStatJob {
                 .addSink(new TunnelBatchStreamSink<>(AdDataOfHourODS.class))
                 .name("sink_ad_hour_ods");
 
+
         // 小时流-计算
         SingleOutputStreamOperator<AdStatOfHourDWD> adHourDWDStream =
                 adHourODSStream.keyBy(AdDataOfHourODS::getAdId)
                         .countWindow(1)
                         .process(new AdHourDWDProcess());
-        DataStream<AdStatOfHourDWD> adHourDWDAllStream = adMinuteDWDStream.getSideOutput(adHourFromMinuteStreamTag).union(adHourDWDStream);
+
+        DataStream<AdStatOfHourDWD> adHourDWDAllStream = adMinuteDWDStream.getSideOutput(adHourFromMinuteStreamTag)
+                .keyBy(AdStatOfHourDWD::getAdId)
+                .process(new HourStreamCompletionProcess())
+                .union(adHourDWDStream);
         new KeyedBatchStream<>("adHourDWDStream", adHourDWDAllStream.keyBy(AdStatOfHourDWD::getStatDay), 4000L, 60 * 1000L)
                 .toBatch()
                 .addSink(new TunnelBatchStreamSink<>(AdStatOfHourDWD.class))

+ 82 - 85
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostMinuteProcess.java

@@ -1,39 +1,102 @@
 package flink.zanxiangnet.ad.monitoring.process;
 
-import flink.zanxiangnet.ad.monitoring.clickhouse.sink.ClickhouseUtil;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.*;
 import flink.zanxiangnet.ad.monitoring.util.DateUtil;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 import org.apache.flink.util.Collector;
 
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
 import java.text.SimpleDateFormat;
 import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
 import java.util.*;
 
-public class CostMinuteProcess extends ProcessWindowFunction<AdStatOfMinuteDWD, CostMinuterDM, Long, TimeWindow> {
-    private static final DateTimeFormatter formatForLastReduceKey = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH:mm");
-    private Connection connection = null;
+public class CostMinuteProcess extends KeyedProcessFunction<Long, AdStatOfMinuteDWD, CostMinuterDM> {
     private int minutenow = 1;
+    // 之前聚合的昨天的数据
+    private MapState<String, Map<Integer, AdStatOfMinuteDWD>> historyReduceState;
 
     @Override
-    public void open(Configuration conf) throws SQLException, ClassNotFoundException, IOException {
-        Properties props = new Properties();
-        props.load(CostMinuteProcess.class.getResourceAsStream("/application.properties"));
-        connection = ClickhouseUtil.getConn(props);
+    public void open(Configuration conf) throws Exception {
+        historyReduceState = getRuntimeContext().getMapState(new MapStateDescriptor<>("historyReduceState", Types.STRING, Types.MAP(Types.INT, Types.POJO(AdStatOfMinuteDWD.class))));
     }
 
-    //数据格式转换
-    public CostMinuterDM datachange(AdStatOfMinuteDWD adStatOfMinuteDWD, CostMinuterDM costMinuterDM) {
+    @Override
+    public void processElement(AdStatOfMinuteDWD adStatOfMinuteDWD, KeyedProcessFunction<Long, AdStatOfMinuteDWD, CostMinuterDM>.Context context,
+                               Collector<CostMinuterDM> collector) throws Exception {
+        LocalDate day = DateUtil.parseLocalDate(adStatOfMinuteDWD.getStatDay());
+        Integer hour = adStatOfMinuteDWD.getHour();
+
+        LocalDate lastHourDay = day, lastTwoHourDay = day, lastThreeHourDay = day;
+        int lastHour = hour - 1, lastTwoHour = hour - 2, lastThreeHour = hour - 3;
+        if (hour == 0) {
+            lastHourDay = day.minusDays(1L);
+            lastHour = 23;
+            lastTwoHourDay = day.minusDays(1L);
+            lastTwoHour = 22;
+            lastThreeHourDay = day.minusDays(1L);
+            lastThreeHour = 21;
+        } else if (hour == 1) {
+            lastTwoHourDay = day.minusDays(1L);
+            lastTwoHour = 23;
+            lastThreeHourDay = day.minusDays(1L);
+            lastThreeHour = 22;
+        } else if (hour == 2) {
+            lastThreeHourDay = day.minusDays(1L);
+            lastThreeHour = 23;
+        }
+        long costDiff = 0L, costLastHour = 0L;
+        Map<Integer, AdStatOfMinuteDWD> lastHourMapping = historyReduceState.get(DateUtil.formatLocalDate(lastHourDay));
+        if (lastHourMapping != null && !lastHourMapping.isEmpty()) {
+            AdStatOfMinuteDWD lastHourDWD = lastHourMapping.get(lastHour);
+            if (lastHourDWD != null) {
+                costLastHour = lastHourDWD.getCostHour();
+                costDiff = adStatOfMinuteDWD.getCostHour() - lastHourDWD.getCostHour();
+            }
+        }
+        long costLastHourDiff = 0, costLastTwoHour = 0;
+        Map<Integer, AdStatOfMinuteDWD> lastTwoHourMapping = historyReduceState.get(DateUtil.formatLocalDate(lastTwoHourDay));
+        if (lastTwoHourMapping != null && !lastTwoHourMapping.isEmpty()) {
+            AdStatOfMinuteDWD lastTwoHourDWD = lastTwoHourMapping.get(lastTwoHour);
+            if (lastTwoHourDWD != null) {
+                costLastTwoHour = lastTwoHourDWD.getCostHour();
+                costLastHourDiff = costLastHour - lastTwoHourDWD.getCostHour();
+            }
+        }
+        long costLastTwoHourDiff = 0, costLastThreeTrend = 0;
+        Map<Integer, AdStatOfMinuteDWD> lastThreeHourMapping = historyReduceState.get(DateUtil.formatLocalDate(lastThreeHourDay));
+        if (lastThreeHourMapping != null && !lastThreeHourMapping.isEmpty()) {
+            AdStatOfMinuteDWD lastThreeHourDWD = lastThreeHourMapping.get(lastThreeHour);
+            if (lastThreeHourDWD != null) {
+                costLastThreeTrend = lastThreeHourDWD.getCostHour();
+                costLastTwoHourDiff = costLastTwoHour - lastThreeHourDWD.getCostHour();
+            }
+        }
 
+        if (historyReduceState.get(adStatOfMinuteDWD.getStatDay()) == null) {
+            Map<Integer, AdStatOfMinuteDWD> hourMapping = new HashMap<>(24);
+            hourMapping.put(adStatOfMinuteDWD.getHour(), adStatOfMinuteDWD);
+            historyReduceState.put(adStatOfMinuteDWD.getStatDay(), hourMapping);
+        } else {
+            historyReduceState.get(adStatOfMinuteDWD.getStatDay()).put(adStatOfMinuteDWD.getHour(), adStatOfMinuteDWD);
+        }
+
+        CostMinuterDM costMinuterDM = dataChange(adStatOfMinuteDWD);
+        costMinuterDM.setCostDiff(costDiff);
+        costMinuterDM.setCostLastHour(costLastHour);
+        costMinuterDM.setCostLastHourDiff(costLastHourDiff);
+        costMinuterDM.setCostLastTwoHour(costLastTwoHour);
+        costMinuterDM.setCostLastTwoHourDiff(costLastTwoHourDiff);
+        costMinuterDM.setCostLastThreeTrend(costLastThreeTrend);
+
+        collector.collect(costMinuterDM);
+    }
+
+    //数据格式转换
+    private CostMinuterDM dataChange(AdStatOfMinuteDWD adStatOfMinuteDWD) {
+        CostMinuterDM costMinuterDM = new CostMinuterDM();
         //时间-天
         costMinuterDM.dt = adStatOfMinuteDWD.getStatDay();
         //计划 id
@@ -110,70 +173,4 @@ public class CostMinuteProcess extends ProcessWindowFunction<AdStatOfMinuteDWD,
         return costMinuterDM;
     }
 
-
-    @Override
-    public void process(Long elementCount, ProcessWindowFunction<AdStatOfMinuteDWD, CostMinuterDM, Long, TimeWindow>.Context context,
-                        Iterable<AdStatOfMinuteDWD> iterable, Collector<CostMinuterDM> collector) throws Exception {
-        long beginTime = context.window().getStart();
-        LocalDateTime beginDateTime = DateUtil.milliToLocalDateTime(beginTime);
-        LocalDate beginDate = beginDateTime.toLocalDate();
-        //当前天
-        String statDay = DateUtil.formatLocalDate(beginDate);
-        System.out.println(statDay);
-        //当前小时
-        int hourInt = beginDateTime.getHour();
-        String hour = beginDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:00:00"));
-        String lastHour = beginDateTime.minusHours(1).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:00:00"));
-        String lastTwoHour = beginDateTime.minusHours(2).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:00:00"));
-        String lastThreeHour = beginDateTime.minusHours(3).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:00:00"));
-
-        this.minutenow = (beginDateTime.getMinute() % 5) == 0 ? 1 : (beginDateTime.getMinute() % 5);
-        long now = System.currentTimeMillis();
-
-        //获取前几分钟
-        List<AdStatOfMinuteDWD> adStatOfMinuteDWDlist = new ArrayList<>(24);
-
-        for (AdStatOfMinuteDWD adStatOfMinuteDWD : iterable) {
-            adStatOfMinuteDWDlist.add(adStatOfMinuteDWD);
-            CostMinuterDM costMinuterDM = new CostMinuterDM();
-
-            if (adStatOfMinuteDWD.getHour() != hourInt) {
-                continue;
-            }
-
-            String adId = adStatOfMinuteDWD.getAdId().toString();
-            String sql = "select " +
-                    "if(hour='" + lastHour + "',cost_hour,0) last_hour_cost, " +
-                    "if(hour='" + lastTwoHour + "',cost_hour,0) last_two_hour_cost, " +
-                    "if(hour='" + lastHour + "',cost_hour,0) - if(hour='" + lastTwoHour + "',cost_hour,0) cost_last_hour_diff, " +
-                    "(if(hour='" + lastHour + "',cost_hour,0) - if(hour='" + lastTwoHour + "',cost_hour,0))*(if(hour='" + lastTwoHour + "',cost_hour,0) - if(hour='" + lastThreeHour + "',cost_hour,0)) cost_last_three_trend " +
-                    "from data_monitoring.cost_hour ch " +
-                    "where dt='" + statDay + "' and ad_id='" + adId + "' ";
-
-            System.out.println(sql);
-            Statement statement = connection.createStatement();
-            ResultSet rs = statement.executeQuery(sql);
-            while (rs.next()) {
-                costMinuterDM.costLastHour = rs.getLong(1);
-                costMinuterDM.costLastTwoHour = rs.getLong(2);
-                costMinuterDM.costLastHourDiff = rs.getLong(3);
-                costMinuterDM.costLastThreeTrend = rs.getLong(4);
-
-            }
-
-
-            CostMinuterDM CostMinuterDM_new = datachange(adStatOfMinuteDWD, costMinuterDM);
-
-            collector.collect(CostMinuterDM_new);
-            // System.out.println("costminute_输出:" + JsonUtil.toString(CostMinuterDM_new));
-        }
-        // System.out.println("costminute_windowCount:" + adStatOfMinuteDWDlist.size());
-
-
-    }
-
-    @Override
-    public void clear(ProcessWindowFunction<AdStatOfMinuteDWD, CostMinuterDM, Long, TimeWindow>.Context context) throws Exception {
-    }
-
 }

+ 195 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/HourStreamCompletionProcess.java

@@ -0,0 +1,195 @@
+package flink.zanxiangnet.ad.monitoring.process;
+
+import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfHourDWD;
+import flink.zanxiangnet.ad.monitoring.util.DateUtil;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.springframework.beans.BeanUtils;
+
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.Date;
+
+/**
+ * 小时流数据补全,把中间没消耗的时间段填充 0
+ */
+public class HourStreamCompletionProcess extends KeyedProcessFunction<Long, AdStatOfHourDWD, AdStatOfHourDWD> {
+
+    private ValueState<AdStatOfHourDWD> lastReduceState;
+
+    @Override
+    public void open(Configuration conf) {
+        lastReduceState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastReduceState", Types.POJO(AdStatOfHourDWD.class)));
+    }
+
+    @Override
+    public void processElement(AdStatOfHourDWD adStatOfHourDWD, KeyedProcessFunction<Long, AdStatOfHourDWD, AdStatOfHourDWD>.Context context, Collector<AdStatOfHourDWD> collector) throws Exception {
+        AdStatOfHourDWD lastReduce = lastReduceState.value();
+        if (lastReduce == null) {
+            lastReduceState.update(adStatOfHourDWD);
+            collector.collect(adStatOfHourDWD);
+            return;
+        }
+        LocalDateTime statDateTime = LocalDateTime.of(DateUtil.parseLocalDate(adStatOfHourDWD.getStatDay()), LocalTime.of(adStatOfHourDWD.getHour(), 0, 0));
+        LocalDateTime lastStatDateTime = LocalDateTime.of(DateUtil.parseLocalDate(lastReduce.getStatDay()), LocalTime.of(lastReduce.getHour(), 0, 0));
+        if (lastStatDateTime.compareTo(statDateTime) >= 0) {
+            return;
+        }
+        long hours = DateUtil.intervalOfHour(lastStatDateTime, statDateTime);
+        if (hours > 1) {
+            // 中间有没数据的时间段,需要进行数据填充
+            for (int i = 1; i < hours; i++) {
+                // 要填充的时间
+                LocalDateTime completionTime = lastStatDateTime.plusHours(i);
+                collector.collect(completion(completionTime, lastReduce));
+            }
+        }
+        collector.collect(adStatOfHourDWD);
+        lastReduceState.update(adStatOfHourDWD);
+    }
+
+    private AdStatOfHourDWD completion(LocalDateTime completionTime, AdStatOfHourDWD lastReduceData) {
+        String statDay = DateUtil.formatLocalDate(completionTime.toLocalDate());
+        int hour = completionTime.getHour();
+        AdStatOfHourDWD result = new AdStatOfHourDWD();
+        BeanUtils.copyProperties(lastReduceData, result);
+        result.setCreateTime(new Date());
+        result.setStatDay(statDay);
+        result.setHour(hour);
+        AdStatOfHourDWD.initValue(result);
+
+        result.setCostDeviationRateTotal(lastReduceData.getCostDeviationRateTotal());
+        result.setCostTotal(lastReduceData.getCostTotal());
+        result.setCompensationAmountTotal(lastReduceData.getCompensationAmountTotal());
+        result.setViewCountTotal(lastReduceData.getViewCountTotal());
+        result.setThousandDisplayPriceAll(lastReduceData.getThousandDisplayPriceAll());
+        result.setValidClickCountTotal(lastReduceData.getValidClickCountTotal());
+        result.setCtrAll(lastReduceData.getCtrAll());
+        result.setCpcAll(lastReduceData.getCpcAll());
+        result.setValuableClickCountTotal(lastReduceData.getValuableClickCountTotal());
+        result.setValuableClickRateAll(lastReduceData.getValuableClickRateAll());
+        result.setValuableClickCostAll(lastReduceData.getValuableClickCostAll());
+        result.setConversionsCountTotal(lastReduceData.getConversionsCountTotal());
+        result.setConversionsCostAll(lastReduceData.getConversionsCostAll());
+        result.setConversionsRateAll(lastReduceData.getConversionsRateAll());
+        result.setDeepConversionsCountTotal(lastReduceData.getDeepConversionsCountTotal());
+        result.setDeepConversionsCostAll(lastReduceData.getDeepConversionsCostAll());
+        result.setDeepConversionsRateAll(lastReduceData.getDeepConversionsRateAll());
+        result.setOrderCountTotal(lastReduceData.getOrderCountTotal());
+        result.setFirstDayOrderCountTotal(lastReduceData.getFirstDayOrderCountTotal());
+        result.setWebOrderCostAll(lastReduceData.getWebOrderCostAll());
+        result.setOrderRateAll(lastReduceData.getOrderRateAll());
+        result.setOrderAmountTotal(lastReduceData.getOrderAmountTotal());
+        result.setFirstDayOrderAmountTotal(lastReduceData.getFirstDayOrderAmountTotal());
+        result.setOrderUnitPriceAll(lastReduceData.getOrderUnitPriceAll());
+        result.setOrderRoiAll(lastReduceData.getOrderRoiAll());
+        result.setSignInCountTotal(lastReduceData.getSignInCountTotal());
+        result.setScanFollowCountTotal(lastReduceData.getScanFollowCountTotal());
+        result.setWechatAppRegisterUvTotal(lastReduceData.getWechatAppRegisterUvTotal());
+        result.setWechatMinigameRegisterCostAll(lastReduceData.getWechatMinigameRegisterCostAll());
+        result.setWechatMinigameRegisterRateAll(lastReduceData.getWechatMinigameRegisterRateAll());
+        result.setWechatMinigameArpuAll(lastReduceData.getWechatMinigameArpuAll());
+        result.setWechatMinigameRetentionCountTotal(lastReduceData.getWechatMinigameRetentionCountTotal());
+        result.setWechatMinigameCheckoutCountTotal(lastReduceData.getWechatMinigameCheckoutCountTotal());
+        result.setWechatMinigameCheckoutAmountTotal(lastReduceData.getWechatMinigameCheckoutAmountTotal());
+        result.setOfficialAccountFollowCountTotal(lastReduceData.getOfficialAccountFollowCountTotal());
+        result.setOfficialAccountFollowRateAll(lastReduceData.getOfficialAccountFollowRateAll());
+        result.setOfficialAccountRegisterUserCountTotal(lastReduceData.getOfficialAccountRegisterUserCountTotal());
+        result.setOfficialAccountRegisterRateAll(lastReduceData.getOfficialAccountRegisterRateAll());
+        result.setOfficialAccountRegisterCostAll(lastReduceData.getOfficialAccountRegisterCostAll());
+        result.setOfficialAccountRegisterAmountTotal(lastReduceData.getOfficialAccountRegisterAmountTotal());
+        result.setOfficialAccountRegisterRoiAll(lastReduceData.getOfficialAccountRegisterRoiAll());
+        result.setOfficialAccountApplyCountTotal(lastReduceData.getOfficialAccountApplyCountTotal());
+        result.setOfficialAccountApplyUserCountTotal(lastReduceData.getOfficialAccountApplyUserCountTotal());
+        result.setOfficialAccountApplyRateAll(lastReduceData.getOfficialAccountApplyRateAll());
+        result.setOfficialAccountApplyCostAll(lastReduceData.getOfficialAccountApplyCostAll());
+        result.setOfficialAccountApplyAmountTotal(lastReduceData.getOfficialAccountApplyAmountTotal());
+        result.setOfficialAccountApplyRoiAll(lastReduceData.getOfficialAccountApplyRoiAll());
+        result.setOfficialAccountOrderCountTotal(lastReduceData.getOfficialAccountOrderCountTotal());
+        result.setOfficialAccountFirstDayOrderCountTotal(lastReduceData.getOfficialAccountFirstDayOrderCountTotal());
+        result.setOfficialAccountOrderUserCountTotal(lastReduceData.getOfficialAccountOrderUserCountTotal());
+        result.setOfficialAccountOrderRateAll(lastReduceData.getOfficialAccountOrderRateAll());
+        result.setOfficialAccountOrderCostAll(lastReduceData.getOfficialAccountOrderCostAll());
+        result.setOfficialAccountOrderAmountTotal(lastReduceData.getOfficialAccountOrderAmountTotal());
+        result.setOfficialAccountFirstDayOrderAmountTotal(lastReduceData.getOfficialAccountFirstDayOrderAmountTotal());
+        result.setOfficialAccountOrderRoiAll(lastReduceData.getOfficialAccountOrderRoiAll());
+        result.setOfficialAccountConsultCountTotal(lastReduceData.getOfficialAccountConsultCountTotal());
+        result.setOfficialAccountReaderCountTotal(lastReduceData.getOfficialAccountReaderCountTotal());
+        result.setOfficialAccountCreditApplyUserCountTotal(lastReduceData.getOfficialAccountCreditApplyUserCountTotal());
+        result.setOfficialAccountCreditUserCountTotal(lastReduceData.getOfficialAccountCreditUserCountTotal());
+        result.setForwardCountTotal(lastReduceData.getForwardCountTotal());
+        result.setForwardUserCountTotal(lastReduceData.getForwardUserCountTotal());
+        result.setNoInterestCountTotal(lastReduceData.getNoInterestCountTotal());
+        result.setNoInterestCountHour(lastReduceData.getNoInterestCountHour());
+
+        if (statDay.equals(lastReduceData.getStatDay())) {
+            result.setCostDeviationRateDay(lastReduceData.getCostDeviationRateDay());
+            result.setCostDay(lastReduceData.getCostDay());
+            result.setCompensationAmountDay(lastReduceData.getCompensationAmountDay());
+            result.setViewCountDay(lastReduceData.getViewCountDay());
+            result.setThousandDisplayPriceDay(lastReduceData.getThousandDisplayPriceDay());
+            result.setValidClickCountDay(lastReduceData.getValidClickCountDay());
+            result.setCtrDay(lastReduceData.getCtrDay());
+            result.setCpcDay(lastReduceData.getCpcDay());
+            result.setValuableClickCountDay(lastReduceData.getValuableClickCountDay());
+            result.setValuableClickRateDay(lastReduceData.getValuableClickRateDay());
+            result.setValuableClickCostDay(lastReduceData.getValuableClickCostDay());
+            result.setConversionsCountDay(lastReduceData.getConversionsCountDay());
+            result.setConversionsCostDay(lastReduceData.getConversionsCostDay());
+            result.setConversionsRateDay(lastReduceData.getConversionsRateDay());
+            result.setDeepConversionsCountDay(lastReduceData.getDeepConversionsCountDay());
+            result.setDeepConversionsCostDay(lastReduceData.getDeepConversionsCostDay());
+            result.setDeepConversionsRateDay(lastReduceData.getDeepConversionsRateDay());
+            result.setOrderCountDay(lastReduceData.getOrderCountDay());
+            result.setFirstDayOrderCountDay(lastReduceData.getFirstDayOrderCountDay());
+            result.setWebOrderCostDay(lastReduceData.getWebOrderCostDay());
+            result.setOrderRateDay(lastReduceData.getOrderRateDay());
+            result.setOrderAmountDay(lastReduceData.getOrderAmountDay());
+            result.setFirstDayOrderAmountDay(lastReduceData.getFirstDayOrderAmountDay());
+            result.setOrderUnitPriceDay(lastReduceData.getOrderUnitPriceDay());
+            result.setOrderRoiDay(lastReduceData.getOrderRoiDay());
+            result.setSignInCountDay(lastReduceData.getSignInCountDay());
+            result.setScanFollowCountDay(lastReduceData.getScanFollowCountDay());
+            result.setWechatAppRegisterUvDay(lastReduceData.getWechatAppRegisterUvDay());
+            result.setWechatMinigameRegisterCostDay(lastReduceData.getWechatMinigameRegisterCostDay());
+            result.setWechatMinigameRegisterRateDay(lastReduceData.getWechatMinigameRegisterRateDay());
+            result.setWechatMinigameArpuDay(lastReduceData.getWechatMinigameArpuDay());
+            result.setWechatMinigameRetentionCountDay(lastReduceData.getWechatMinigameRetentionCountDay());
+            result.setWechatMinigameCheckoutCountDay(lastReduceData.getWechatMinigameCheckoutCountDay());
+            result.setWechatMinigameCheckoutAmountDay(lastReduceData.getWechatMinigameCheckoutAmountDay());
+            result.setOfficialAccountFollowCountDay(lastReduceData.getOfficialAccountFollowCountDay());
+            result.setOfficialAccountFollowRateDay(lastReduceData.getOfficialAccountFollowRateDay());
+            result.setOfficialAccountRegisterUserCountDay(lastReduceData.getOfficialAccountRegisterUserCountDay());
+            result.setOfficialAccountRegisterRateDay(lastReduceData.getOfficialAccountRegisterRateDay());
+            result.setOfficialAccountRegisterCostDay(lastReduceData.getOfficialAccountRegisterCostDay());
+            result.setOfficialAccountRegisterAmountDay(lastReduceData.getOfficialAccountRegisterAmountDay());
+            result.setOfficialAccountRegisterRoiDay(lastReduceData.getOfficialAccountRegisterRoiDay());
+            result.setOfficialAccountApplyCountDay(lastReduceData.getOfficialAccountApplyCountDay());
+            result.setOfficialAccountApplyUserCountDay(lastReduceData.getOfficialAccountApplyUserCountDay());
+            result.setOfficialAccountApplyRateDay(lastReduceData.getOfficialAccountApplyRateDay());
+            result.setOfficialAccountApplyCostDay(lastReduceData.getOfficialAccountApplyCostDay());
+            result.setOfficialAccountApplyAmountDay(lastReduceData.getOfficialAccountApplyAmountDay());
+            result.setOfficialAccountApplyRoiDay(lastReduceData.getOfficialAccountApplyRoiDay());
+            result.setOfficialAccountOrderCountDay(lastReduceData.getOfficialAccountOrderCountDay());
+            result.setOfficialAccountFirstDayOrderCountDay(lastReduceData.getOfficialAccountFirstDayOrderCountDay());
+            result.setOfficialAccountOrderUserCountDay(lastReduceData.getOfficialAccountOrderUserCountDay());
+            result.setOfficialAccountOrderRateDay(lastReduceData.getOfficialAccountOrderRateDay());
+            result.setOfficialAccountOrderCostDay(lastReduceData.getOfficialAccountOrderCostDay());
+            result.setOfficialAccountOrderAmountDay(lastReduceData.getOfficialAccountOrderAmountDay());
+            result.setOfficialAccountFirstDayOrderAmountDay(lastReduceData.getOfficialAccountFirstDayOrderAmountDay());
+            result.setOfficialAccountOrderRoiDay(lastReduceData.getOfficialAccountOrderRoiDay());
+            result.setOfficialAccountConsultCountDay(lastReduceData.getOfficialAccountConsultCountDay());
+            result.setOfficialAccountReaderCountDay(lastReduceData.getOfficialAccountReaderCountDay());
+            result.setOfficialAccountCreditApplyUserCountDay(lastReduceData.getOfficialAccountCreditApplyUserCountDay());
+            result.setOfficialAccountCreditUserCountDay(lastReduceData.getOfficialAccountCreditUserCountDay());
+            result.setForwardCountDay(lastReduceData.getForwardCountDay());
+            result.setForwardUserCountDay(lastReduceData.getForwardUserCountDay());
+            result.setNoInterestCountDay(lastReduceData.getNoInterestCountDay());
+        }
+        return result;
+    }
+}

+ 2 - 11
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/trigger/CostMinuteDMStreamTrigger.java

@@ -10,16 +10,7 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 public class CostMinuteDMStreamTrigger extends Trigger<AdStatOfMinuteDWD, TimeWindow> {
     @Override
     public TriggerResult onElement(AdStatOfMinuteDWD adStatOfMinuteDWD, long time, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
-        // FIXME 水印时间没有意义!拿到数据是 Long.MAX
-        if (timeWindow.maxTimestamp() <= triggerContext.getCurrentWatermark()) {
-            // 到了窗口的最大生命周期
-            return TriggerResult.FIRE_AND_PURGE;
-        }
-        triggerContext.registerEventTimeTimer(timeWindow.maxTimestamp());
-        if (adStatOfMinuteDWD.getHour() == DateUtil.milliToLocalDateTime(time).getHour()) {
-            return TriggerResult.FIRE;
-        }
-        return TriggerResult.CONTINUE;
+        return TriggerResult.FIRE;
     }
 
     @Override
@@ -29,7 +20,7 @@ public class CostMinuteDMStreamTrigger extends Trigger<AdStatOfMinuteDWD, TimeWi
 
     @Override
     public TriggerResult onEventTime(long time, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
-        return time == timeWindow.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
+        return TriggerResult.FIRE_AND_PURGE;
     }
 
     @Override

+ 12 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/util/DateUtil.java

@@ -2,6 +2,7 @@ package flink.zanxiangnet.ad.monitoring.util;
 
 import java.time.*;
 import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
 import java.util.Date;
 
 public class DateUtil {
@@ -107,4 +108,15 @@ public class DateUtil {
     public static long intervalOfDays(LocalDate beginDate, LocalDate endDate) {
         return endDate.toEpochDay() - beginDate.toEpochDay();
     }
+
+    /**
+     * 2个日期的时间间隔
+     *
+     * @param beginDate
+     * @param endDate
+     * @return
+     */
+    public static long intervalOfHour(LocalDateTime beginDate, LocalDateTime endDate) {
+        return ChronoUnit.HOURS.between(beginDate, endDate);
+    }
 }