Forráskód Böngészése

MOD:数据年修改

cxyu 3 éve
szülő
commit
dbf238cc36

+ 8 - 7
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdStatJob.java

@@ -173,8 +173,10 @@ public class AdStatJob {
                 .addSink(new TunnelBatchStreamSink<>(AdDataOfHourODS.class))
                 .name("sink_ad_hour_ods");
 
-        SingleOutputStreamOperator<AdStatOfHourDWD> adHourDWDStream = adHourODSStream.keyBy(AdDataOfHourODS::getAdId)
-                .countWindow(1).process(new ProcessWindowFunction<AdDataOfHourODS, AdStatOfHourDWD, Long, GlobalWindow>() {
+        SingleOutputStreamOperator<AdStatOfHourDWD> adHourDWDStream =
+                adHourODSStream.keyBy(AdDataOfHourODS::getAdId)
+                .countWindow(1).
+                process(new ProcessWindowFunction<AdDataOfHourODS, AdStatOfHourDWD, Long, GlobalWindow>() {
                     private Odps odps;
                     // 上次查询的天数据
                     private ValueState<String> lastQueryDayState;
@@ -246,12 +248,11 @@ public class AdStatJob {
         SingleOutputStreamOperator<CostHourDM> clickhouseHourDmStream =
                 adHourDWDStream
                         .keyBy(AdStatOfHourDWD::getAdId)
-                        .window(TumblingEventTimeWindows.of(Time.minutes(5L)))
-                        .trigger(new CostHourDMStreamTrigger())
+                        .countWindow(1)
                         .process(new CostHourProcess())
                         .name("sink_ad_hour_dm_clickhouse");
 
-        clickhouseHourDmStream.print();
+//        clickhouseHourDmStream.print();
         BatchSinkHour batchSinkhour = new BatchSinkHour();
         clickhouseHourDmStream.addSink(batchSinkhour);
 
@@ -387,7 +388,7 @@ public class AdStatJob {
                         .process(new CostHourDayProcess())
                         .name("sink_ad_hour_day_dm_clickhouse");
 
-        clickhouseHourDayDmStream.print();
+//        clickhouseHourDayDmStream.print();
         BatchSinkHour batchSinkHourDay = new BatchSinkHour();
         clickhouseHourDayDmStream.addSink(batchSinkHourDay);
 
@@ -424,7 +425,7 @@ public class AdStatJob {
                         .process(new CostHourDayProcess())
                         .name("sink_ad_hour_year_dm_clickhouse");
 
-        clickhouseHourYearDmStream.print();
+//        clickhouseHourYearDmStream.print();
         BatchSinkHour batchSinkHourYear = new BatchSinkHour();
         clickhouseHourYearDmStream.addSink(batchSinkHourYear);
 

+ 5 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/clickhouse/sink/BatchSinkHour.java

@@ -1,5 +1,6 @@
 package flink.zanxiangnet.ad.monitoring.clickhouse.sink;
 
+import flink.zanxiangnet.ad.monitoring.AdStatJob;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.CostHourDM;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -10,6 +11,7 @@ import java.sql.PreparedStatement;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Properties;
 
 public class BatchSinkHour extends RichSinkFunction<CostHourDM> {
     private static final Logger log = LoggerFactory.getLogger(BatchSinkHour.class);
@@ -26,7 +28,9 @@ public class BatchSinkHour extends RichSinkFunction<CostHourDM> {
     @Override
     public void open(Configuration config) throws Exception {
         super.open(config);
-        connection = ClickhouseUtil.getConn("cc-bp11803zbt0oq045io.ads.rds.aliyuncs.com",
+        Properties props = new Properties();
+        props.load(AdStatJob.class.getResourceAsStream("/application.properties"));
+        connection = ClickhouseUtil.getConn(props, "cc-bp11803zbt0oq045io.ads.rds.aliyuncs.com",
                 "8123", "data_monitoring");
     }
 

+ 5 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/clickhouse/sink/BatchSinkMinute.java

@@ -1,5 +1,6 @@
 package flink.zanxiangnet.ad.monitoring.clickhouse.sink;
 
+import flink.zanxiangnet.ad.monitoring.AdStatJob;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.CostMinuterDM;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -10,6 +11,7 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Properties;
 
 public class BatchSinkMinute extends RichSinkFunction<CostMinuterDM> {
     private static final Logger log = LoggerFactory.getLogger(BatchSinkMinute.class);
@@ -27,7 +29,9 @@ public class BatchSinkMinute extends RichSinkFunction<CostMinuterDM> {
     @Override
     public void open(Configuration config) throws Exception {
         super.open(config);
-        connection=ClickhouseUtil.getConn("cc-bp11803zbt0oq045io.ads.rds.aliyuncs.com",
+        Properties props = new Properties();
+        props.load(AdStatJob.class.getResourceAsStream("/application.properties"));
+        connection = ClickhouseUtil.getConn(props, "cc-bp11803zbt0oq045io.ads.rds.aliyuncs.com",
                 "8123", "data_monitoring");
     }
 

+ 8 - 5
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostHourDayProcess.java

@@ -1,5 +1,6 @@
 package flink.zanxiangnet.ad.monitoring.process;
 
+import flink.zanxiangnet.ad.monitoring.AdStatJob;
 import flink.zanxiangnet.ad.monitoring.clickhouse.sink.ClickhouseUtil;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfDayDWD;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfHourDWD;
@@ -12,6 +13,7 @@ import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.util.Collector;
 
+import java.io.IOException;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -22,6 +24,7 @@ import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Properties;
 
 public class CostHourDayProcess extends ProcessWindowFunction<AdStatOfDayDWD, CostHourDM, Long, GlobalWindow> {
     private static final DateTimeFormatter formatForLastReduceKey = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH:mm");
@@ -29,9 +32,10 @@ public class CostHourDayProcess extends ProcessWindowFunction<AdStatOfDayDWD, Co
 
 
     @Override
-    public void open(Configuration conf) throws SQLException, ClassNotFoundException {
-        ClickhouseUtil clickhouseUtil = new ClickhouseUtil();
-        connection = ClickhouseUtil.getConn("cc-bp11803zbt0oq045io.ads.rds.aliyuncs.com",
+    public void open(Configuration conf) throws SQLException, ClassNotFoundException, IOException {
+        Properties props = new Properties();
+        props.load(AdStatJob.class.getResourceAsStream("/application.properties"));
+        connection = ClickhouseUtil.getConn(props, "cc-bp11803zbt0oq045io.ads.rds.aliyuncs.com",
                 "8123", "data_monitoring");
     }
 
@@ -45,8 +49,7 @@ public class CostHourDayProcess extends ProcessWindowFunction<AdStatOfDayDWD, Co
         costHourDM.createTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(adStatOfMinuteDWD.getCreateTime());
         //时间-小时
         //TODO:之后需要进一步修改
-        String tmpHour = new SimpleDateFormat("yyyy-MM-dd HH:00:00").format(adStatOfMinuteDWD.getCreateTime());
-        costHourDM.hour = tmpHour;
+        costHourDM.hour = adStatOfMinuteDWD.getStatDay() + " 24:00:00";
         //广告id
         costHourDM.adId = adStatOfMinuteDWD.getAdId().toString();
         //广告组id

+ 34 - 29
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostHourProcess.java

@@ -1,5 +1,6 @@
 package flink.zanxiangnet.ad.monitoring.process;
 
+import flink.zanxiangnet.ad.monitoring.AdStatJob;
 import flink.zanxiangnet.ad.monitoring.clickhouse.sink.ClickhouseUtil;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfHourDWD;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfMinuteDWD;
@@ -9,9 +10,11 @@ import flink.zanxiangnet.ad.monitoring.util.DateUtil;
 import flink.zanxiangnet.ad.monitoring.util.JsonUtil;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.util.Collector;
 
+import java.io.IOException;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -23,17 +26,18 @@ import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
+import java.util.Properties;
 
-public class CostHourProcess extends ProcessWindowFunction<AdStatOfHourDWD, CostHourDM, Long, TimeWindow> {
+public class CostHourProcess extends ProcessWindowFunction<AdStatOfHourDWD, CostHourDM, Long, GlobalWindow> {
     private static final DateTimeFormatter formatForLastReduceKey = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH:mm");
     private Connection connection = null;
-    private int minutenow = 1;
 
 
     @Override
-    public void open(Configuration conf) throws SQLException, ClassNotFoundException {
-        ClickhouseUtil clickhouseUtil = new ClickhouseUtil();
-        connection = ClickhouseUtil.getConn("cc-bp11803zbt0oq045io.ads.rds.aliyuncs.com",
+    public void open(Configuration conf) throws SQLException, ClassNotFoundException, IOException {
+        Properties props = new Properties();
+        props.load(AdStatJob.class.getResourceAsStream("/application.properties"));
+        connection = ClickhouseUtil.getConn(props, "cc-bp11803zbt0oq045io.ads.rds.aliyuncs.com",
                 "8123", "data_monitoring");
     }
 
@@ -47,8 +51,8 @@ public class CostHourProcess extends ProcessWindowFunction<AdStatOfHourDWD, Cost
         costHourDM.createTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(adStatOfMinuteDWD.getCreateTime());
         //时间-小时
         //TODO:之后需要进一步修改
-        String tmpHour = new SimpleDateFormat("yyyy-MM-dd HH:00:00").format(adStatOfMinuteDWD.getCreateTime());
-        costHourDM.hour = tmpHour;
+        String tmpHour = adStatOfMinuteDWD.getHour() > 9 ? adStatOfMinuteDWD.getHour().toString() : "0" + adStatOfMinuteDWD.getHour().toString();
+        costHourDM.hour = adStatOfMinuteDWD.getStatDay() + " " + tmpHour + ":00:00";
         //广告id
         costHourDM.adId = adStatOfMinuteDWD.getAdId().toString();
         //广告组id
@@ -64,7 +68,7 @@ public class CostHourProcess extends ProcessWindowFunction<AdStatOfHourDWD, Cost
         //当天小时消耗
         costHourDM.costHour = adStatOfMinuteDWD.getCostHour();
         //消耗速度
-        costHourDM.costSpeed = this.minutenow == 0 ? 0 : adStatOfMinuteDWD.getCostHour() / this.minutenow;
+        costHourDM.costSpeed = adStatOfMinuteDWD.getCostHour();
         //总浏览量
         costHourDM.viewCountTotal = adStatOfMinuteDWD.getViewCountTotal();
         //天-总浏览量
@@ -208,25 +212,10 @@ public class CostHourProcess extends ProcessWindowFunction<AdStatOfHourDWD, Cost
 
 
     @Override
-    public void process(Long elementCount, ProcessWindowFunction<AdStatOfHourDWD, CostHourDM, Long, TimeWindow>.Context context,
+    public void process(Long elementCount, ProcessWindowFunction<AdStatOfHourDWD, CostHourDM, Long, GlobalWindow>.Context context,
                         Iterable<AdStatOfHourDWD> iterable, Collector<CostHourDM> collector) throws Exception {
-        long beginTime = context.window().getStart();
-        LocalDateTime beginDateTime = DateUtil.milliToLocalDateTime(beginTime);
-        LocalDate beginDate = beginDateTime.toLocalDate();
-        //当前天
-        String statDay = DateUtil.formatLocalDate(beginDate);
-        System.out.println(statDay);
-        //当前小时
-        int hourInt = beginDateTime.getHour();
-        this.minutenow = beginDateTime.getMinute();
-        String hour = beginDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:00:00"));
-        String lastHour = beginDateTime.minusHours(1).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:00:00"));
-        String lastTwoHour = beginDateTime.minusHours(2).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:00:00"));
-        String lastThreeHour = beginDateTime.minusHours(3).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:00:00"));
-
-        long now = System.currentTimeMillis();
-
 
+        System.out.println("进入get in ");
         //获取前几分钟
 
 
@@ -235,9 +224,24 @@ public class CostHourProcess extends ProcessWindowFunction<AdStatOfHourDWD, Cost
         for (AdStatOfHourDWD adStatOfMinuteDWD : iterable) {
             adStatOfMinuteDWDlist.add(adStatOfMinuteDWD);
             CostHourDM costHourDM = new CostHourDM();
-            if (adStatOfMinuteDWD.getHour() != hourInt) {
-                continue;
-            }
+
+
+            //当前天
+            String statDay = adStatOfMinuteDWD.getStatDay();
+            //当前小时
+            int hour = adStatOfMinuteDWD.getHour();
+            String tmpHour = "";
+            int tmpHourInt = adStatOfMinuteDWD.getHour() - 1;
+            tmpHour = tmpHourInt > 9 ? String.valueOf(tmpHourInt) : "0" + tmpHourInt;
+            String lastHour = adStatOfMinuteDWD.getStatDay() + " " + tmpHour + ":00:00";
+
+            tmpHourInt = adStatOfMinuteDWD.getHour() - 2;
+            tmpHour = tmpHourInt > 9 ? String.valueOf(tmpHourInt) : "0" + tmpHourInt;
+            String lastTwoHour = adStatOfMinuteDWD.getStatDay() + " " + tmpHour + ":00:00";
+
+            tmpHourInt = adStatOfMinuteDWD.getHour() - 3;
+            tmpHour = tmpHourInt > 9 ? String.valueOf(tmpHourInt) : "0" + tmpHourInt;
+            String lastThreeHour = adStatOfMinuteDWD.getStatDay() + " " + tmpHour + ":00:00";
 
             String adId = adStatOfMinuteDWD.getAdId().toString();
             String sql = "select " +
@@ -261,6 +265,7 @@ public class CostHourProcess extends ProcessWindowFunction<AdStatOfHourDWD, Cost
             CostHourDM costHourDM_new = datachange(adStatOfMinuteDWD, costHourDM);
 
             collector.collect(costHourDM_new);
+            System.out.println("costhour_输入" + JsonUtil.toString(adStatOfMinuteDWD));
             System.out.println("costhour_输出:" + JsonUtil.toString(costHourDM_new));
         }
         System.out.println("costhour_windowCount:" + adStatOfMinuteDWDlist.size());
@@ -269,7 +274,7 @@ public class CostHourProcess extends ProcessWindowFunction<AdStatOfHourDWD, Cost
     }
 
     @Override
-    public void clear(ProcessWindowFunction<AdStatOfHourDWD, CostHourDM, Long, TimeWindow>.Context context) throws Exception {
+    public void clear(ProcessWindowFunction<AdStatOfHourDWD, CostHourDM, Long, GlobalWindow>.Context context) throws Exception {
         System.out.println("窗口关闭");
     }
 

+ 6 - 3
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostMinuteProcess.java

@@ -6,6 +6,7 @@ import com.aliyun.odps.account.Account;
 import com.aliyun.odps.account.AliyunAccount;
 import com.aliyun.odps.data.Record;
 import com.aliyun.odps.task.SQLTask;
+import flink.zanxiangnet.ad.monitoring.AdStatJob;
 import flink.zanxiangnet.ad.monitoring.clickhouse.sink.ClickhouseUtil;
 import flink.zanxiangnet.ad.monitoring.maxcompute.MaxComputeLog;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.*;
@@ -21,6 +22,7 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.util.Collector;
 
+import java.io.IOException;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -38,9 +40,10 @@ public class CostMinuteProcess extends ProcessWindowFunction<AdStatOfMinuteDWD,
     private int minutenow = 1;
 
     @Override
-    public void open(Configuration conf) throws SQLException, ClassNotFoundException {
-        ClickhouseUtil clickhouseUtil = new ClickhouseUtil();
-        connection = ClickhouseUtil.getConn("cc-bp11803zbt0oq045io.ads.rds.aliyuncs.com",
+    public void open(Configuration conf) throws SQLException, ClassNotFoundException, IOException {
+        Properties props = new Properties();
+        props.load(AdStatJob.class.getResourceAsStream("/application.properties"));
+        connection = ClickhouseUtil.getConn(props, "cc-bp11803zbt0oq045io.ads.rds.aliyuncs.com",
                 "8123", "data_monitoring");
     }