Quellcode durchsuchen

version 0.89 (maxcompute增加日志)

wcc vor 3 Jahren
Ursprung
Commit
add6cbb435

+ 1 - 1
flink-ad-monitoring/dependency-reduced-pom.xml

@@ -88,7 +88,7 @@
               </filters>
               <transformers>
                 <transformer>
-                  <mainClass>flink.zanxiangnet.ad.monitoring.AdStatJob</mainClass>
+                  <mainClass>flink.zanxiangnet.ad.monitoring.PlanStatJob</mainClass>
                 </transformer>
               </transformers>
             </configuration>

+ 1 - 1
flink-ad-monitoring/pom.xml

@@ -202,7 +202,7 @@ under the License.
                             <transformers>
                                 <transformer
                                         implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-                                    <mainClass>flink.zanxiangnet.ad.monitoring.AdStatJob</mainClass>
+                                    <mainClass>flink.zanxiangnet.ad.monitoring.PlanStatJob</mainClass>
                                 </transformer>
                             </transformers>
                         </configuration>

+ 6 - 3
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdStatJob.java

@@ -34,7 +34,6 @@ import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.kafka.source.KafkaSource;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -208,7 +207,9 @@ public class AdStatJob {
                         // 从 maxCompute拉取指定 广告的历史数据
                         if (lastQueryDay == null || !lastQueryDay.equals(DateUtil.formatLocalDate(today))) {
                             LocalDate endDay = today, beginDay = statDay.minusDays(60);
-                            Instance instance = SQLTask.run(odps, "SELECT * FROM ad_stat_of_hour_dwd WHERE stat_day >= \"" + DateUtil.formatLocalDate(beginDay) + "\" AND stat_day <= \"" + DateUtil.formatLocalDate(endDay) + "\" AND ad_id = " + element.getAdId() + ";");
+                            String sql = "SELECT * FROM ad_stat_of_hour_dwd WHERE stat_day >= \"" + DateUtil.formatLocalDate(beginDay) + "\" AND stat_day <= \"" + DateUtil.formatLocalDate(endDay) + "\" AND ad_id = " + element.getAdId() + ";";
+                            Instance instance = SQLTask.run(odps, sql);
+                            System.out.println("212===>sql: " + sql + ", odps日志: " + odps.logview().generateLogView(instance, 7 * 24));
                             instance.waitForSuccess();
                             List<Record> records = SQLTask.getResult(instance);
                             Map<String, AdStatOfHourDWD> historyHourMap = records.stream()
@@ -331,7 +332,9 @@ public class AdStatJob {
                         // 从 maxCompute查找广告的历史数据
                         if (lastQueryDay == null || !lastQueryDay.equals(DateUtil.formatLocalDate(LocalDate.now()))) {
                             LocalDate endTime = LocalDate.now(), beginTime = statDay.minusDays(60);
-                            Instance instance = SQLTask.run(odps, "SELECT * FROM ad_stat_of_day_dwd WHERE stat_day >= \"" + DateUtil.formatLocalDate(beginTime) + "\" AND stat_day <= \"" + DateUtil.formatLocalDate(endTime) + "\" AND ad_id = " + element.getAdId() + ";");
+                            String sql = "SELECT * FROM ad_stat_of_day_dwd WHERE stat_day >= \"" + DateUtil.formatLocalDate(beginTime) + "\" AND stat_day <= \"" + DateUtil.formatLocalDate(endTime) + "\" AND ad_id = " + element.getAdId() + ";";
+                            Instance instance = SQLTask.run(odps, sql);
+                            System.out.println("337===>sql: " + sql + ", odps日志: " + odps.logview().generateLogView(instance, 7 * 24));
                             instance.waitForSuccess();
                             List<Record> records = SQLTask.getResult(instance);
                             Map<String, AdStatOfDayDWD> historyData = records.stream()

+ 6 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/PlanStatJob.java

@@ -173,7 +173,9 @@ public class PlanStatJob {
                         // 从 maxCompute拉取指定 广告的历史数据
                         if (lastQueryDay == null || !lastQueryDay.equals(DateUtil.formatLocalDate(today))) {
                             LocalDate endDay = today, beginDay = statDay.minusDays(60);
-                            Instance instance = SQLTask.run(odps, "SELECT * FROM plan_stat_of_hour_dwd WHERE stat_day >= \"" + DateUtil.formatLocalDate(beginDay) + "\" AND stat_day <= \"" + DateUtil.formatLocalDate(endDay) + "\" AND campaign_id = " + element.getCampaignId() + ";");
+                            String sql = "SELECT * FROM plan_stat_of_hour_dwd WHERE stat_day >= \"" + DateUtil.formatLocalDate(beginDay) + "\" AND stat_day <= \"" + DateUtil.formatLocalDate(endDay) + "\" AND campaign_id = " + element.getCampaignId() + ";";
+                            Instance instance = SQLTask.run(odps, sql);
+                            System.out.println("178===>sql: " + sql + ", odps日志: " + odps.logview().generateLogView(instance, 7 * 24));
                             instance.waitForSuccess();
                             List<Record> records = SQLTask.getResult(instance);
                             Map<String, PlanStatOfHourDWD> historyHourMap = records.stream()
@@ -291,7 +293,9 @@ public class PlanStatJob {
                         // 从 maxCompute查找广告的历史数据
                         if (lastQueryDay == null || !lastQueryDay.equals(DateUtil.formatLocalDate(LocalDate.now()))) {
                             LocalDate endTime = LocalDate.now(), beginTime = statDay.minusDays(60);
-                            Instance instance = SQLTask.run(odps, "SELECT * FROM plan_stat_of_day_dwd WHERE stat_day >= \"" + DateUtil.formatLocalDate(beginTime) + "\" AND stat_day <= \"" + DateUtil.formatLocalDate(endTime) + "\" AND campaign_id = " + element.getCampaignId() + ";");
+                            String sql = "SELECT * FROM plan_stat_of_day_dwd WHERE stat_day >= \"" + DateUtil.formatLocalDate(beginTime) + "\" AND stat_day <= \"" + DateUtil.formatLocalDate(endTime) + "\" AND campaign_id = " + element.getCampaignId() + ";";
+                            Instance instance = SQLTask.run(odps, sql);
+                            System.out.println("298===>sql: " + sql + ", odps日志: " + odps.logview().generateLogView(instance, 7 * 24));
                             instance.waitForSuccess();
                             List<Record> records = SQLTask.getResult(instance);
                             Map<String, PlanStatOfDayDWD> historyData = records.stream()

+ 3 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdMinuteDWDProcess.java

@@ -96,7 +96,9 @@ public class AdMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteODS,
         if (lastQueryDay == null || !lastQueryDay.equals(statDay)) {
             // 往前找 30天
             LocalDate endDay = beginDate.minusDays(2L), beginDay = endDay.minusDays(30L);
-            Instance instance = SQLTask.run(odps, "SELECT * FROM ad_stat_of_day_dwd WHERE stat_day >= \"" + DateUtil.formatLocalDate(beginDay) + "\" AND stat_day <= \"" + endDay + "\" AND ad_id = " + adId + ";");
+            String sql = "SELECT * FROM ad_stat_of_day_dwd WHERE stat_day >= \"" + DateUtil.formatLocalDate(beginDay) + "\" AND stat_day <= \"" + endDay + "\" AND ad_id = " + adId + ";";
+            Instance instance = SQLTask.run(odps, sql);
+            System.out.println("101===>sql: " + sql + ", odps日志: " + odps.logview().generateLogView(instance, 7 * 24));
             instance.waitForSuccess();
             List<Record> records = SQLTask.getResult(instance);
             List<AdStatOfDayDWD> historyDayData = records.stream().map(AdStatOfDayDWD::byMaxCompute).sorted((val1, val2) -> {

+ 3 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/PlanMinuteDWDProcess.java

@@ -92,7 +92,9 @@ public class PlanMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteOD
         if (lastQueryDay == null || !lastQueryDay.equals(statDay)) {
             // 往前找 30天
             LocalDate endDay = beginDate.minusDays(2L), beginDay = endDay.minusDays(30L);
-            Instance instance = SQLTask.run(odps, "SELECT * FROM plan_stat_of_day_dwd WHERE stat_day >= \"" + DateUtil.formatLocalDate(beginDay) + "\" AND stat_day <= \"" + endDay + "\" AND campaign_id = " + campaignId + ";");
+            String sql = "SELECT * FROM plan_stat_of_day_dwd WHERE stat_day >= \"" + DateUtil.formatLocalDate(beginDay) + "\" AND stat_day <= \"" + endDay + "\" AND campaign_id = " + campaignId + ";";
+            Instance instance = SQLTask.run(odps, sql);
+            System.out.println("97===>sql: " + sql + ", odps日志: " + odps.logview().generateLogView(instance, 7 * 24));
             instance.waitForSuccess();
             List<Record> records = SQLTask.getResult(instance);
             List<PlanStatOfDayDWD> historyDayData = records.stream().map(PlanStatOfDayDWD::byMaxCompute).sorted((val1, val2) -> {