Quellcode durchsuchen

ADD:数据回滚--年、天

cxyu vor 3 Jahren
Ursprung
Commit
bb51efb11c

+ 43 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdStatJob.java

@@ -17,11 +17,13 @@ import flink.zanxiangnet.ad.monitoring.pojo.entity.*;
 import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
 import flink.zanxiangnet.ad.monitoring.pojo.dto.AdDataOfMinuteDTO;
 import flink.zanxiangnet.ad.monitoring.process.AdMinuteDWDProcess;
+import flink.zanxiangnet.ad.monitoring.process.CostHourDayProcess;
 import flink.zanxiangnet.ad.monitoring.process.CostHourProcess;
 import flink.zanxiangnet.ad.monitoring.process.CostMinuteProcess;
 import flink.zanxiangnet.ad.monitoring.sink.TunnelBatchStreamSink;
 import flink.zanxiangnet.ad.monitoring.stream.KeyedBatchStream;
 import flink.zanxiangnet.ad.monitoring.trigger.AdMinuteODSStreamTrigger;
+import flink.zanxiangnet.ad.monitoring.trigger.CostHourDMStreamTrigger;
 import flink.zanxiangnet.ad.monitoring.trigger.CostMinuteDMStreamTrigger;
 import flink.zanxiangnet.ad.monitoring.util.DateUtil;
 import flink.zanxiangnet.ad.monitoring.util.JsonUtil;
@@ -157,11 +159,12 @@ public class AdStatJob {
                         .window(TumblingEventTimeWindows.of(Time.minutes(5L)))
                         .trigger(new CostMinuteDMStreamTrigger())
                         .process(new CostMinuteProcess())
-                        .name("sink_ad_hour_dm_clickhouse");
+                        .name("sink_ad_minute_dm_clickhouse");
 
         BatchSinkMinute batchSinkMinute = new BatchSinkMinute();
         clickhouseMinuteDmStream.addSink(batchSinkMinute);
 
+
         // 小时流(直接写到小时报表的 ods)
         DataStream<AdDataOfHourODS> adHourODSStream = adODSStream.getSideOutput(adHourStreamTag);
         // 写入原始表
@@ -238,6 +241,19 @@ public class AdStatJob {
                 .addSink(new TunnelBatchStreamSink<>(AdStatOfHourDWD.class))
                 .name("sink_ad_hour_dwd");
 
+        //小时数据
+        SingleOutputStreamOperator<CostHourDM> clickhouseHourDmStream =
+                adHourDWDStream
+                        .keyBy(AdStatOfHourDWD::getAdId)
+                        .window(TumblingEventTimeWindows.of(Time.minutes(5L)))
+                        .trigger(new CostHourDMStreamTrigger())
+                        .process(new CostHourProcess())
+                        .name("sink_ad_hour_dm_clickhouse");
+
+        clickhouseHourDmStream.print();
+        BatchSinkHour batchSinkhour = new BatchSinkHour();
+        clickhouseHourDmStream.addSink(batchSinkhour);
+
 
         // ------------------------------------------------------- 处理广告的天数据 -----------------------------------------
         KafkaSource<String> adStreamOfDaySource = KafkaComponent.buildKafkaSource(props, KafkaComponent.KafkaTopic.adDayTopic, KafkaComponent.KafkaTopic.KafkaGroupId.adDayConsumerGroup);
@@ -360,6 +376,19 @@ public class AdStatJob {
                 .addSink(new TunnelBatchStreamSink<>(AdStatOfDayDWD.class))
                 .name("sink_ad_day_dwd");
 
+        //小时数据-----天
+        SingleOutputStreamOperator<CostHourDM> clickhouseHourDayDmStream =
+                adDayDWDStream
+                        .keyBy(AdStatOfDayDWD::getAdId)
+                        .countWindow(1)
+                        .process(new CostHourDayProcess())
+                        .name("sink_ad_hour_day_dm_clickhouse");
+
+        clickhouseHourDayDmStream.print();
+        BatchSinkHour batchSinkHourDay = new BatchSinkHour();
+        clickhouseHourDayDmStream.addSink(batchSinkHourDay);
+
+
         SingleOutputStreamOperator<AdStatOfDayDWD> adDayDWDYearStream = adDayODSStreamSplit.getSideOutput(adDayStreamRollYearTag)
                 .keyBy(AdDataOfDayODS::getAdId)
                 .countWindow(1).process(new ProcessWindowFunction<AdDataOfDayODS, AdStatOfDayDWD, Long, GlobalWindow>() {
@@ -384,7 +413,19 @@ public class AdStatJob {
                 .toBatch()
                 .addSink(new TunnelBatchStreamSink<>(AdStatOfDayDWD.class))
                 .name("sink_ad_year_dwd");
-
+        //小时数据----年
+        SingleOutputStreamOperator<CostHourDM> clickhouseHourYearDmStream =
+                adDayDWDYearStream
+                        .keyBy(AdStatOfDayDWD::getAdId)
+                        .countWindow(1)
+                        .process(new CostHourDayProcess())
+                        .name("sink_ad_hour_year_dm_clickhouse");
+
+        clickhouseHourYearDmStream.print();
+        BatchSinkHour batchSinkHourYear = new BatchSinkHour();
+        clickhouseHourYearDmStream.addSink(batchSinkHourYear);
+
+        //TODO:jobname----后面应该带个时间以此来区分log以及job
         env.execute("job_ad");
     }
 }

+ 308 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostHourDayProcess.java

@@ -0,0 +1,308 @@
+package flink.zanxiangnet.ad.monitoring.process;
+
+import flink.zanxiangnet.ad.monitoring.clickhouse.sink.ClickhouseUtil;
+import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfDayDWD;
+import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfHourDWD;
+import flink.zanxiangnet.ad.monitoring.pojo.entity.CostHourDM;
+import flink.zanxiangnet.ad.monitoring.util.DateUtil;
+import flink.zanxiangnet.ad.monitoring.util.JsonUtil;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.SimpleDateFormat;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+
+public class CostHourDayProcess extends ProcessWindowFunction<AdStatOfDayDWD, CostHourDM, Long, GlobalWindow> {
+    private static final DateTimeFormatter formatForLastReduceKey = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH:mm");
+    private Connection connection = null;
+
+
+    @Override
+    public void open(Configuration conf) throws SQLException, ClassNotFoundException {
+        ClickhouseUtil clickhouseUtil = new ClickhouseUtil();
+        connection = ClickhouseUtil.getConn("cc-bp11803zbt0oq045io.ads.rds.aliyuncs.com",
+                "8123", "data_monitoring");
+    }
+
+    //数据格式转换
+    public CostHourDM datachange(AdStatOfDayDWD adStatOfMinuteDWD, CostHourDM costHourDM) {
+        //时间-天
+        costHourDM.dt = adStatOfMinuteDWD.getStatDay();
+        //计划 id
+        costHourDM.campaignId = adStatOfMinuteDWD.getCampaignId().toString();
+        //时间- real
+        costHourDM.createTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(adStatOfMinuteDWD.getCreateTime());
+        //时间-小时
+        //TODO:之后需要进一步修改
+        String tmpHour = new SimpleDateFormat("yyyy-MM-dd HH:00:00").format(adStatOfMinuteDWD.getCreateTime());
+        costHourDM.hour = tmpHour;
+        //广告id
+        costHourDM.adId = adStatOfMinuteDWD.getAdId().toString();
+        //广告组id
+        costHourDM.adgroupId = adStatOfMinuteDWD.getAdgroupId().toString();
+        //创意id
+        costHourDM.adcreativeId = "";
+        //账号id
+        costHourDM.accountId = adStatOfMinuteDWD.getAccountId().toString();
+        //总消耗
+        costHourDM.costTotal = adStatOfMinuteDWD.getCostTotal();
+        //当天消耗
+        costHourDM.costDay = adStatOfMinuteDWD.getCostDay();
+        //当天小时消耗
+        //TODO:数据--小时会缺失,下面的小时数据同理
+        costHourDM.costHour = 0;
+
+        //消耗速度
+        //TODO:如果这个回滚发生在24点有问题,会覆盖掉24点的小时速度
+        costHourDM.costSpeed = 0;
+
+        //总浏览量
+        costHourDM.viewCountTotal = adStatOfMinuteDWD.getViewCountTotal();
+        //天-总浏览量
+        costHourDM.viewCountDay = adStatOfMinuteDWD.getViewCountDay();
+        //小时-总浏览量
+        costHourDM.viewCountHour = 0;
+        //总平均千次曝光成本
+        costHourDM.thousandDisplayPriceAll = adStatOfMinuteDWD.getThousandDisplayPriceAll();
+        //天-总平均曝光成本
+        costHourDM.thousandDisplayPriceDay = adStatOfMinuteDWD.getThousandDisplayPriceDay();
+        //小时-总平均曝光成本
+        costHourDM.thousandDisplayPriceHour = 0;
+        //总点击量
+        costHourDM.validClickCountTotal = adStatOfMinuteDWD.getValidClickCountTotal();
+        //天-总点击量
+        costHourDM.validClickCountDay = adStatOfMinuteDWD.getValidClickCountDay();
+        //小时-总点击量
+        costHourDM.validClickCountHour = 0;
+        //总平均点击率
+        costHourDM.ctrAll = adStatOfMinuteDWD.getCtrAll();
+        //天-总平均点击率
+        costHourDM.ctrDay = adStatOfMinuteDWD.getCtrDay();
+        //小时-总平均点击率
+        costHourDM.ctrHour = 0;
+        //总点击均价
+        costHourDM.cpcAll = adStatOfMinuteDWD.getCpcAll();
+        //天-总点击均价
+        costHourDM.cpcDay = adStatOfMinuteDWD.getCpcDay();
+        //小时-总点击均价
+        costHourDM.cpcHour = 0;
+        //总目标转化量
+        costHourDM.conversionsCountTotal = adStatOfMinuteDWD.getConversionsCountTotal();
+        //天-总目标转化量
+        costHourDM.conversionsCountDay = adStatOfMinuteDWD.getConversionsCountDay();
+        //小时-总目标转化量
+        costHourDM.conversionsCountHour = 0;
+        //总目标平均转化成本
+        costHourDM.conversionsCostTotal = adStatOfMinuteDWD.getConversionsCostAll();
+        //天-总目标平均转化成本
+        costHourDM.conversionsCostDay = adStatOfMinuteDWD.getConversionsCostDay();
+        //小时-总目标平均转化成本
+        costHourDM.conversionsCostHour = 0;
+        //总平均转化率
+        costHourDM.conversionsRateAll = adStatOfMinuteDWD.getConversionsRateAll();
+        //天-总平均转化率
+        costHourDM.conversionsRateDay = adStatOfMinuteDWD.getConversionsRateDay();
+        //小时-总平均转化率
+        costHourDM.conversionsRateHour = 0;
+        //TODO:总首日下单roi
+        costHourDM.firstDayOrderRoiTotal = 0;
+        //天-总首日下单roi
+        costHourDM.firstDayOrderRoiDay = 0;
+        //小时-总首日下单roi
+        costHourDM.firstDayOrderRoiHour = 0;
+        //总首日下单金额
+        costHourDM.firstDayOrderAmountTotal = adStatOfMinuteDWD.getFirstDayOrderAmountTotal();
+        //天-总首日下单金额
+        costHourDM.firstDayOrderAmountDay = adStatOfMinuteDWD.getFirstDayOrderAmountDay();
+        //小时-总首日下单金额
+        costHourDM.firstDayOrderAmountHour = 0;
+        //总首日下单量
+        costHourDM.firstDayOrderCountTotal = adStatOfMinuteDWD.getFirstDayOrderCountTotal();
+        //天-总首日下单量
+        costHourDM.firstDayOrderCountDay = adStatOfMinuteDWD.getFirstDayOrderCountDay();
+        //小时-总首日下单量
+        costHourDM.firstDayOrderCountHour = 0;
+        //总下单金额
+        costHourDM.webOrderAmountTotal = adStatOfMinuteDWD.getOrderAmountTotal();
+        //天-总下单金额
+        costHourDM.webOrderAmountDay = adStatOfMinuteDWD.getOrderAmountDay();
+        //小时-总下单金额
+        costHourDM.webOrderAmountHour = 0;
+        //总平均下单成本
+        costHourDM.webOrderCostTotal = adStatOfMinuteDWD.getWebOrderCostAll();
+        //天-总平均下单成本
+        costHourDM.webOrderCostDay = adStatOfMinuteDWD.getWebOrderCostDay();
+        //小时-总平均下单成本
+        costHourDM.webOrderCostHour = 0;
+        //总平均下单率
+        costHourDM.webOrderRateTotal = adStatOfMinuteDWD.getOrderRateAll();
+        //天-总平均下单率
+        costHourDM.webOrderRateDay = adStatOfMinuteDWD.getOrderRateDay();
+        //小时-总平均下单率
+        costHourDM.webOrderRateHour = 0;
+        //TODO:总平均下单量-----webordercount和ordercount是同一个东西吗
+        costHourDM.webOrderCountTotal = adStatOfMinuteDWD.getOrderCountTotal();
+        //天-总平均下单量
+        costHourDM.webOrderCountDay = adStatOfMinuteDWD.getOrderCountDay();
+        //小时-总平均下单量
+        costHourDM.webOrderCountHour = 0;
+        //总下单ROI
+        costHourDM.orderRoiTotal = adStatOfMinuteDWD.getOrderRoiAll();
+        //天-总下单roi
+        costHourDM.orderRoiDay = adStatOfMinuteDWD.getOrderRoiDay();
+        //小时-总下单roi
+        costHourDM.orderRoiHour = 0;
+        //总平均下单客单价
+        costHourDM.orderUnitPriceTotal = adStatOfMinuteDWD.getOrderUnitPriceAll();
+        //天-总平均下单客单价
+        costHourDM.orderUnitPriceDay = adStatOfMinuteDWD.getOrderUnitPriceDay();
+        //小时-总平均下单客单价
+        costHourDM.orderUnitPriceHour = 0;
+        //总公众号关注量
+        costHourDM.fromFollowUvTotal = adStatOfMinuteDWD.getOfficialAccountFollowCountTotal();
+        //天-总公众号关注量
+        costHourDM.fromFollowUvDay = adStatOfMinuteDWD.getOfficialAccountFollowCountDay();
+        //小时-总公众号关注量
+        costHourDM.fromFollowUvHour = 0;
+        //TODO:总平均公众号关注成本---是否是价格/关注
+        costHourDM.fromFollowCostTotal = adStatOfMinuteDWD.getOfficialAccountFollowCountTotal() == 0 ? 0 : adStatOfMinuteDWD.getCostTotal() / adStatOfMinuteDWD.getOfficialAccountFollowCountTotal();
+        //天-总平均公众号关注成本
+        costHourDM.fromFollowCostDay = adStatOfMinuteDWD.getOfficialAccountFollowCountDay() == 0 ? 0 : adStatOfMinuteDWD.getCostDay() / adStatOfMinuteDWD.getOfficialAccountFollowCountDay();
+        //小时-总平均公众号关注成本
+        costHourDM.fromFollowCostHour = 0;
+        //TODO:总平均公众号关注率----确认是否对应
+        costHourDM.fromFollowRateTotal = adStatOfMinuteDWD.getOfficialAccountFollowRateAll();
+        //天-总平均公众号关注率
+        costHourDM.fromFollowRateDay = adStatOfMinuteDWD.getOfficialAccountFollowRateDay();
+        //小时-总平均公众号关注率
+        costHourDM.fromFollowRateHour = 0;
+        //TODO:总注册数-----下面全是有问题的
+        costHourDM.webRegisterCountTotal = adStatOfMinuteDWD.getOfficialAccountRegisterUserCountTotal();
+        //天-总注册数
+        costHourDM.webRegisterCountDay = adStatOfMinuteDWD.getOfficialAccountRegisterUserCountDay();
+        //小时-总注册数
+        costHourDM.webRegisterCountHour = 0;
+        //总注册人数
+        costHourDM.webRegisterUvTotal = adStatOfMinuteDWD.getOfficialAccountRegisterUserCountTotal();
+        //天-总注册人数
+        costHourDM.webRegisterUvDay = adStatOfMinuteDWD.getOfficialAccountRegisterUserCountDay();
+        //小时-总注册人数
+        costHourDM.webRegisterUvHour = 0;
+        //总平均注册成本
+        costHourDM.webRegisterCostTotal = adStatOfMinuteDWD.getOfficialAccountRegisterCostAll();
+        //天-总平均注册成本
+        costHourDM.webRegisterCostDay = adStatOfMinuteDWD.getOfficialAccountRegisterCostDay();
+        //小时-总平均注册成本
+        costHourDM.webRegisterCostHour = 0;
+        return costHourDM;
+    }
+
+
+    @Override
+    public void process(Long elementCount, ProcessWindowFunction<AdStatOfDayDWD, CostHourDM, Long, GlobalWindow>.Context context,
+                        Iterable<AdStatOfDayDWD> iterable, Collector<CostHourDM> collector) throws Exception {
+
+        List<AdStatOfDayDWD> adStatOfMinuteDWDlist = new ArrayList<>(24);
+
+        for (AdStatOfDayDWD adStatOfMinuteDWD : iterable) {
+            adStatOfMinuteDWDlist.add(adStatOfMinuteDWD);
+            CostHourDM costHourDM = new CostHourDM();
+
+            String tmpHour = new SimpleDateFormat("yyyy-MM-dd 24:00:00").format(adStatOfMinuteDWD.getCreateTime());
+
+            String sql = "select " +
+                    "cost_speed," +
+                    "cost_hour ,\n" +
+                    "view_count_hour ," +
+                    "thousand_display_price_hour ,\n" +
+                    "valid_click_count_hour ," +
+                    "ctr_hour ,\n" +
+                    "cpc_hour ," +
+                    "conversions_count_hour ,\n" +
+                    "conversions_cost_hour ," +
+                    "conversions_rate_hour ,\n" +
+                    "first_day_order_roi_hour ," +
+                    "first_day_order_amount_hour ,\n" +
+                    "first_day_order_count_hour ," +
+                    "web_order_amount_hour ,\n" +
+                    "web_order_cost_hour ," +
+                    "web_order_rate_hour ,\n" +
+                    "web_order_count_hour ," +
+                    "order_roi_hour ,\n" +
+                    "order_unit_price_hour ," +
+                    "from_follow_uv_hour ,\n" +
+                    "from_follow_cost_hour ," +
+                    "from_follow_rate_hour ,\n" +
+                    "web_register_count_hour ," +
+                    "web_register_uv_hour ," +
+                    "web_register_cost_hour,\n" +
+                    "cost_last_hour ," +
+                    "cost_last_two_hour ," +
+                    "cost_last_hour_diff ," +
+                    "cost_last_three_trend " +
+                    "from data_monitoring.cost_hour\n" +
+                    "where ad_id ='" + adStatOfMinuteDWD.getAdId() + "' \n" +
+                    "and dt='" + adStatOfMinuteDWD.getStatDay() + "' and hour='" + tmpHour + "'";
+
+
+            System.out.println(sql);
+            Statement statement = connection.createStatement();
+            ResultSet rs = statement.executeQuery(sql);
+            while (rs.next()) {
+                costHourDM.costSpeed = rs.getDouble(1);
+                costHourDM.costHour = rs.getLong(2);
+                costHourDM.viewCountHour = rs.getLong(3);
+                costHourDM.thousandDisplayPriceHour = rs.getLong(4);
+                costHourDM.validClickCountHour = rs.getLong(5);
+                costHourDM.ctrHour = rs.getDouble(6);
+                costHourDM.cpcHour = rs.getLong(7);
+                costHourDM.conversionsCountHour = rs.getLong(8);
+                costHourDM.conversionsCostHour = rs.getLong(9);
+                costHourDM.conversionsRateHour = rs.getDouble(10);
+                costHourDM.firstDayOrderRoiHour = rs.getDouble(11);
+                costHourDM.firstDayOrderAmountHour = rs.getLong(12);
+                costHourDM.firstDayOrderCountHour = rs.getLong(13);
+                costHourDM.webOrderAmountHour = rs.getLong(14);
+                costHourDM.webOrderCostHour = rs.getLong(15);
+                costHourDM.webOrderRateHour = rs.getDouble(16);
+                costHourDM.webOrderCountHour = rs.getLong(17);
+                costHourDM.orderRoiHour = rs.getDouble(18);
+                costHourDM.orderUnitPriceHour = rs.getLong(19);
+                costHourDM.fromFollowUvHour = rs.getLong(20);
+                costHourDM.fromFollowCostHour = rs.getLong(21);
+                costHourDM.fromFollowRateHour = rs.getDouble(22);
+                costHourDM.webRegisterCountHour = rs.getLong(23);
+                costHourDM.webRegisterUvHour = rs.getLong(24);
+                costHourDM.webRegisterCostHour = rs.getDouble(25);
+                costHourDM.costLastHour = rs.getLong(26);
+                costHourDM.costLastTwoHour = rs.getLong(27);
+                costHourDM.costLastHourDiff = rs.getLong(28);
+                costHourDM.costLastThreeTrend = rs.getLong(29);
+            }
+            CostHourDM costHourDM_new = datachange(adStatOfMinuteDWD, costHourDM);
+
+            collector.collect(costHourDM_new);
+            System.out.println("costhour_输出:" + JsonUtil.toString(costHourDM_new));
+        }
+        System.out.println("costhour_windowCount:" + adStatOfMinuteDWDlist.size());
+
+
+    }
+
+    @Override
+    public void clear(ProcessWindowFunction<AdStatOfDayDWD, CostHourDM, Long, GlobalWindow>.Context context) throws Exception {
+        System.out.println("窗口关闭");
+    }
+
+}

+ 38 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/trigger/CostHourDMStreamTrigger.java

@@ -0,0 +1,38 @@
+package flink.zanxiangnet.ad.monitoring.trigger;
+
+import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfHourDWD;
+import flink.zanxiangnet.ad.monitoring.util.DateUtil;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+public class CostHourDMStreamTrigger extends Trigger<AdStatOfHourDWD, TimeWindow> {
+    @Override
+    public TriggerResult onElement(AdStatOfHourDWD adStatOfHourDWD, long time, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
+        // FIXME 水印时间没有意义!拿到数据是 Long.MAX
+        if (timeWindow.maxTimestamp() <= triggerContext.getCurrentWatermark()) {
+            // 到了窗口的最大生命周期
+            return TriggerResult.FIRE_AND_PURGE;
+        }
+        triggerContext.registerEventTimeTimer(timeWindow.maxTimestamp());
+        if (adStatOfHourDWD.getHour() == DateUtil.milliToLocalDateTime(time).getHour()) {
+            return TriggerResult.FIRE;
+        }
+        return TriggerResult.CONTINUE;
+    }
+
+    @Override
+    public TriggerResult onProcessingTime(long time, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
+        return TriggerResult.CONTINUE;
+    }
+
+    @Override
+    public TriggerResult onEventTime(long time, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
+        return time == timeWindow.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
+    }
+
+    @Override
+    public void clear(TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
+        triggerContext.deleteEventTimeTimer(timeWindow.maxTimestamp());
+    }
+}