Procházet zdrojové kódy

ADD:数据回滚--年、天

cxyu před 3 roky
rodič
revize
a8843170da

+ 6 - 3
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdStatJob.java

@@ -36,7 +36,6 @@ import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.kafka.source.KafkaSource;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -211,7 +210,9 @@ public class AdStatJob {
                         // 从 maxCompute拉取指定 广告的历史数据
                         if (lastQueryDay == null || !lastQueryDay.equals(DateUtil.formatLocalDate(today))) {
                             LocalDate endDay = today, beginDay = statDay.minusDays(60);
-                            Instance instance = SQLTask.run(odps, "SELECT * FROM ad_stat_of_hour_dwd WHERE stat_day >= \"" + DateUtil.formatLocalDate(beginDay) + "\" AND stat_day <= \"" + DateUtil.formatLocalDate(endDay) + "\" AND ad_id = " + element.getAdId() + ";");
+                            String sql = "SELECT * FROM ad_stat_of_hour_dwd WHERE stat_day >= \"" + DateUtil.formatLocalDate(beginDay) + "\" AND stat_day <= \"" + DateUtil.formatLocalDate(endDay) + "\" AND ad_id = " + element.getAdId() + ";";
+                            Instance instance = SQLTask.run(odps, sql);
+                            System.out.println("212===>sql: " + sql + ", odps日志: " + odps.logview().generateLogView(instance, 7 * 24));
                             instance.waitForSuccess();
                             List<Record> records = SQLTask.getResult(instance);
                             Map<String, AdStatOfHourDWD> historyHourMap = records.stream()
@@ -347,7 +348,9 @@ public class AdStatJob {
                         // 从 maxCompute查找广告的历史数据
                         if (lastQueryDay == null || !lastQueryDay.equals(DateUtil.formatLocalDate(LocalDate.now()))) {
                             LocalDate endTime = LocalDate.now(), beginTime = statDay.minusDays(60);
-                            Instance instance = SQLTask.run(odps, "SELECT * FROM ad_stat_of_day_dwd WHERE stat_day >= \"" + DateUtil.formatLocalDate(beginTime) + "\" AND stat_day <= \"" + DateUtil.formatLocalDate(endTime) + "\" AND ad_id = " + element.getAdId() + ";");
+                            String sql = "SELECT * FROM ad_stat_of_day_dwd WHERE stat_day >= \"" + DateUtil.formatLocalDate(beginTime) + "\" AND stat_day <= \"" + DateUtil.formatLocalDate(endTime) + "\" AND ad_id = " + element.getAdId() + ";";
+                            Instance instance = SQLTask.run(odps, sql);
+                            System.out.println("337===>sql: " + sql + ", odps日志: " + odps.logview().generateLogView(instance, 7 * 24));
                             instance.waitForSuccess();
                             List<Record> records = SQLTask.getResult(instance);
                             Map<String, AdStatOfDayDWD> historyData = records.stream()

+ 6 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/PlanStatJob.java

@@ -173,7 +173,9 @@ public class PlanStatJob {
                         // 从 maxCompute拉取指定 广告的历史数据
                         if (lastQueryDay == null || !lastQueryDay.equals(DateUtil.formatLocalDate(today))) {
                             LocalDate endDay = today, beginDay = statDay.minusDays(60);
-                            Instance instance = SQLTask.run(odps, "SELECT * FROM plan_stat_of_hour_dwd WHERE stat_day >= \"" + DateUtil.formatLocalDate(beginDay) + "\" AND stat_day <= \"" + DateUtil.formatLocalDate(endDay) + "\" AND campaign_id = " + element.getCampaignId() + ";");
+                            String sql = "SELECT * FROM plan_stat_of_hour_dwd WHERE stat_day >= \"" + DateUtil.formatLocalDate(beginDay) + "\" AND stat_day <= \"" + DateUtil.formatLocalDate(endDay) + "\" AND campaign_id = " + element.getCampaignId() + ";";
+                            Instance instance = SQLTask.run(odps, sql);
+                            System.out.println("178===>sql: " + sql + ", odps日志: " + odps.logview().generateLogView(instance, 7 * 24));
                             instance.waitForSuccess();
                             List<Record> records = SQLTask.getResult(instance);
                             Map<String, PlanStatOfHourDWD> historyHourMap = records.stream()
@@ -291,7 +293,9 @@ public class PlanStatJob {
                         // 从 maxCompute查找广告的历史数据
                         if (lastQueryDay == null || !lastQueryDay.equals(DateUtil.formatLocalDate(LocalDate.now()))) {
                             LocalDate endTime = LocalDate.now(), beginTime = statDay.minusDays(60);
-                            Instance instance = SQLTask.run(odps, "SELECT * FROM plan_stat_of_day_dwd WHERE stat_day >= \"" + DateUtil.formatLocalDate(beginTime) + "\" AND stat_day <= \"" + DateUtil.formatLocalDate(endTime) + "\" AND campaign_id = " + element.getCampaignId() + ";");
+                            String sql = "SELECT * FROM plan_stat_of_day_dwd WHERE stat_day >= \"" + DateUtil.formatLocalDate(beginTime) + "\" AND stat_day <= \"" + DateUtil.formatLocalDate(endTime) + "\" AND campaign_id = " + element.getCampaignId() + ";";
+                            Instance instance = SQLTask.run(odps, sql);
+                            System.out.println("298===>sql: " + sql + ", odps日志: " + odps.logview().generateLogView(instance, 7 * 24));
                             instance.waitForSuccess();
                             List<Record> records = SQLTask.getResult(instance);
                             Map<String, PlanStatOfDayDWD> historyData = records.stream()

+ 3 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdMinuteDWDProcess.java

@@ -82,7 +82,6 @@ public class AdMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteODS,
             statODS = adDataOfMinuteODS;
             break;
         }
-        System.out.println("windowCount:" + adDataOfMinuteODSList.size());
         if (statODS == null) {
             return;
         }
@@ -96,7 +95,9 @@ public class AdMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteODS,
         if (lastQueryDay == null || !lastQueryDay.equals(statDay)) {
             // 往前找 30天
             LocalDate endDay = beginDate.minusDays(2L), beginDay = endDay.minusDays(30L);
-            Instance instance = SQLTask.run(odps, "SELECT * FROM ad_stat_of_day_dwd WHERE stat_day >= \"" + DateUtil.formatLocalDate(beginDay) + "\" AND stat_day <= \"" + endDay + "\" AND ad_id = " + adId + ";");
+            String sql = "SELECT * FROM ad_stat_of_day_dwd WHERE stat_day >= \"" + DateUtil.formatLocalDate(beginDay) + "\" AND stat_day <= \"" + endDay + "\" AND ad_id = " + adId + ";";
+            Instance instance = SQLTask.run(odps, sql);
+            System.out.println("101===>sql: " + sql + ", odps日志: " + odps.logview().generateLogView(instance, 7 * 24));
             instance.waitForSuccess();
             List<Record> records = SQLTask.getResult(instance);
             List<AdStatOfDayDWD> historyDayData = records.stream().map(AdStatOfDayDWD::byMaxCompute).sorted((val1, val2) -> {

+ 2 - 3
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostMinuteProcess.java

@@ -176,16 +176,15 @@ public class CostMinuteProcess extends ProcessWindowFunction<AdStatOfMinuteDWD,
             CostMinuterDM CostMinuterDM_new = datachange(adStatOfMinuteDWD, costMinuterDM);
 
             collector.collect(CostMinuterDM_new);
-            System.out.println("costminute_输出:" + JsonUtil.toString(CostMinuterDM_new));
+            // System.out.println("costminute_输出:" + JsonUtil.toString(CostMinuterDM_new));
         }
-        System.out.println("costminute_windowCount:" + adStatOfMinuteDWDlist.size());
+        // System.out.println("costminute_windowCount:" + adStatOfMinuteDWDlist.size());
 
 
     }
 
     @Override
     public void clear(ProcessWindowFunction<AdStatOfMinuteDWD, CostMinuterDM, Long, TimeWindow>.Context context) throws Exception {
-        System.out.println("窗口关闭");
     }
 
 }

+ 3 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/PlanMinuteDWDProcess.java

@@ -92,7 +92,9 @@ public class PlanMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteOD
         if (lastQueryDay == null || !lastQueryDay.equals(statDay)) {
             // 往前找 30天
             LocalDate endDay = beginDate.minusDays(2L), beginDay = endDay.minusDays(30L);
-            Instance instance = SQLTask.run(odps, "SELECT * FROM plan_stat_of_day_dwd WHERE stat_day >= \"" + DateUtil.formatLocalDate(beginDay) + "\" AND stat_day <= \"" + endDay + "\" AND campaign_id = " + campaignId + ";");
+            String sql = "SELECT * FROM plan_stat_of_day_dwd WHERE stat_day >= \"" + DateUtil.formatLocalDate(beginDay) + "\" AND stat_day <= \"" + endDay + "\" AND campaign_id = " + campaignId + ";";
+            Instance instance = SQLTask.run(odps, sql);
+            System.out.println("97===>sql: " + sql + ", odps日志: " + odps.logview().generateLogView(instance, 7 * 24));
             instance.waitForSuccess();
             List<Record> records = SQLTask.getResult(instance);
             List<PlanStatOfDayDWD> historyDayData = records.stream().map(PlanStatOfDayDWD::byMaxCompute).sorted((val1, val2) -> {
@@ -151,6 +153,5 @@ public class PlanMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteOD
 
     @Override
     public void clear(ProcessWindowFunction<AdDataOfMinuteODS, PlanStatOfMinuteDWD, Long, TimeWindow>.Context context) throws Exception {
-        System.out.println("窗口关闭");
     }
 }