Sfoglia il codice sorgente

ADD:数据回滚--年、天

cxyu 3 anni fa
parent
commit
8cd9ca9571

+ 0 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdMinuteDWDProcess.java

@@ -116,7 +116,6 @@ public class AdMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteODS,
         // 聚合当天的全部数据
         AdStatOfMinuteDWD newAdStat = AdStatOfMinuteDWD.reduce(beforeYesterdayDayDWD, yesterdayMinuteDWD, adDataOfMinuteODSList, statODS, lastReduceData, now);
         collector.collect(newAdStat);
-        System.out.println("输出:" + JsonUtil.toString(newAdStat));
         lastReduceState.put(beginDateTime.format(formatForLastReduceKey), newAdStat);
         yesterdayMinuteState.put(statDay, newAdStat);
 

+ 10 - 8
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostHourProcess.java

@@ -1,6 +1,7 @@
 package flink.zanxiangnet.ad.monitoring.process;
 
 import flink.zanxiangnet.ad.monitoring.clickhouse.sink.ClickhouseUtil;
+import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfHourDWD;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfMinuteDWD;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.CostHourDM;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.CostMinuterDM;
@@ -23,7 +24,7 @@ import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 
-public class CostHourProcess extends ProcessWindowFunction<AdStatOfMinuteDWD, CostHourDM, Long, TimeWindow> {
+public class CostHourProcess extends ProcessWindowFunction<AdStatOfHourDWD, CostHourDM, Long, TimeWindow> {
     private static final DateTimeFormatter formatForLastReduceKey = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH:mm");
     private Connection connection = null;
     private int minutenow = 1;
@@ -37,7 +38,7 @@ public class CostHourProcess extends ProcessWindowFunction<AdStatOfMinuteDWD, Co
     }
 
     //数据格式转换
-    public CostHourDM datachange(AdStatOfMinuteDWD adStatOfMinuteDWD, CostHourDM costHourDM) {
+    public CostHourDM datachange(AdStatOfHourDWD adStatOfMinuteDWD, CostHourDM costHourDM) {
         //时间-天
         costHourDM.dt = adStatOfMinuteDWD.getStatDay();
         //计划 id
@@ -45,7 +46,8 @@ public class CostHourProcess extends ProcessWindowFunction<AdStatOfMinuteDWD, Co
         //时间- real
         costHourDM.createTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(adStatOfMinuteDWD.getCreateTime());
         //时间-小时
-        String tmpHour = new SimpleDateFormat("yyyy-MM-dd HH:00:00").format(new Date(adStatOfMinuteDWD.getStatTime()));
+        //TODO:之后需要进一步修改
+        String tmpHour = new SimpleDateFormat("yyyy-MM-dd HH:00:00").format(adStatOfMinuteDWD.getCreateTime());
         costHourDM.hour = tmpHour;
         //广告id
         costHourDM.adId = adStatOfMinuteDWD.getAdId().toString();
@@ -206,8 +208,8 @@ public class CostHourProcess extends ProcessWindowFunction<AdStatOfMinuteDWD, Co
 
 
     @Override
-    public void process(Long elementCount, ProcessWindowFunction<AdStatOfMinuteDWD, CostHourDM, Long, TimeWindow>.Context context,
-                        Iterable<AdStatOfMinuteDWD> iterable, Collector<CostHourDM> collector) throws Exception {
+    public void process(Long elementCount, ProcessWindowFunction<AdStatOfHourDWD, CostHourDM, Long, TimeWindow>.Context context,
+                        Iterable<AdStatOfHourDWD> iterable, Collector<CostHourDM> collector) throws Exception {
         long beginTime = context.window().getStart();
         LocalDateTime beginDateTime = DateUtil.milliToLocalDateTime(beginTime);
         LocalDate beginDate = beginDateTime.toLocalDate();
@@ -228,9 +230,9 @@ public class CostHourProcess extends ProcessWindowFunction<AdStatOfMinuteDWD, Co
         //获取前几分钟
 
 
-        List<AdStatOfMinuteDWD> adStatOfMinuteDWDlist = new ArrayList<>(24);
+        List<AdStatOfHourDWD> adStatOfMinuteDWDlist = new ArrayList<>(24);
 
-        for (AdStatOfMinuteDWD adStatOfMinuteDWD : iterable) {
+        for (AdStatOfHourDWD adStatOfMinuteDWD : iterable) {
             adStatOfMinuteDWDlist.add(adStatOfMinuteDWD);
             CostHourDM costHourDM = new CostHourDM();
             if (adStatOfMinuteDWD.getHour() != hourInt) {
@@ -267,7 +269,7 @@ public class CostHourProcess extends ProcessWindowFunction<AdStatOfMinuteDWD, Co
     }
 
     @Override
-    public void clear(ProcessWindowFunction<AdStatOfMinuteDWD, CostHourDM, Long, TimeWindow>.Context context) throws Exception {
+    public void clear(ProcessWindowFunction<AdStatOfHourDWD, CostHourDM, Long, TimeWindow>.Context context) throws Exception {
         System.out.println("窗口关闭");
     }