Browse Source

version 0.89

wcc 3 years ago
parent
commit
34c1e9cf9d

+ 2 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdStatJob.java

@@ -214,7 +214,7 @@ public class AdStatJob {
 
 
 
 
         // ------------------------------------------------------- 处理广告的天数据 -----------------------------------------
         // ------------------------------------------------------- 处理广告的天数据 -----------------------------------------
-        KafkaSource<String> adStreamOfDaySource = KafkaComponent.buildKafkaSource(props, KafkaComponent.KafkaTopic.adDayTopic, KafkaComponent.KafkaTopic.KafkaGroupId.adDayConsumerGroup);
+        /*KafkaSource<String> adStreamOfDaySource = KafkaComponent.buildKafkaSource(props, KafkaComponent.KafkaTopic.adDayTopic, KafkaComponent.KafkaTopic.KafkaGroupId.adDayConsumerGroup);
 
 
         DataStreamSource<String> adStreamOfDayIn = env.fromSource(adStreamOfDaySource, WatermarkStrategy.noWatermarks(), "adStreamOfMinuteSource_kafka");
         DataStreamSource<String> adStreamOfDayIn = env.fromSource(adStreamOfDaySource, WatermarkStrategy.noWatermarks(), "adStreamOfMinuteSource_kafka");
 
 
@@ -353,7 +353,7 @@ public class AdStatJob {
         //.addSink(new TunnelBatchSink<>(AdStatOfDayDWD.class, 30000L, 365L, 6));
         //.addSink(new TunnelBatchSink<>(AdStatOfDayDWD.class, 30000L, 365L, 6));
         new KeyedBatchStream<>("adDayDWDYearStream", adDayDWDYearStream.keyBy(AdStatOfDayDWD::getStatDay), 4000L, 60 * 1000L)
         new KeyedBatchStream<>("adDayDWDYearStream", adDayDWDYearStream.keyBy(AdStatOfDayDWD::getStatDay), 4000L, 60 * 1000L)
                 .toBatch()
                 .toBatch()
-                .addSink(new TunnelBatchStreamSink<>(AdStatOfDayDWD.class));
+                .addSink(new TunnelBatchStreamSink<>(AdStatOfDayDWD.class));*/
 
 
         env.execute();
         env.execute();
     }
     }

+ 0 - 3
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/PlanStatJob.java

@@ -58,9 +58,6 @@ public class PlanStatJob {
     private static final String OLDEST_DAY = "2019-01-01";
     private static final String OLDEST_DAY = "2019-01-01";
 
 
     public static void main(String[] args) throws Exception {
     public static void main(String[] args) throws Exception {
-        System.setProperty("javax.net.ssl.trustStore", "D:\\Downloads\\kafka.client.truststore.jks");
-        // System.setProperty("javax.net.ssl.trustStore", "/root/flink-1.13.2/kafka.client.truststore.jks");
-        System.setProperty("javax.net.ssl.trustStorePassword", "KafkaOnsClient");
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 
         // 加载配置文件到 flink的全局配置中
         // 加载配置文件到 flink的全局配置中

+ 0 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/Test.java

@@ -24,7 +24,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
-import org.apache.flink.table.planner.expressions.In;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Collector;
 
 
 import java.time.Duration;
 import java.time.Duration;

+ 15 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdMinuteDWDProcess.java

@@ -118,23 +118,36 @@ public class AdMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteODS,
         System.out.println("输出:" + JsonUtil.toString(newAdStat));
         System.out.println("输出:" + JsonUtil.toString(newAdStat));
         lastReduceState.put(beginDateTime.format(formatForLastReduceKey), newAdStat);
         lastReduceState.put(beginDateTime.format(formatForLastReduceKey), newAdStat);
         yesterdayMinuteState.put(statDay, newAdStat);
         yesterdayMinuteState.put(statDay, newAdStat);
-        
+
+        List<String> removeKeys = new ArrayList<>(10);
         for (String key : lastReduceState.keys()) {
         for (String key : lastReduceState.keys()) {
             if (key.equals(beginDateTime.format(formatForLastReduceKey))
             if (key.equals(beginDateTime.format(formatForLastReduceKey))
                     || key.equals(beginDateTime.minusMinutes(5L).format(formatForLastReduceKey))
                     || key.equals(beginDateTime.minusMinutes(5L).format(formatForLastReduceKey))
                     || key.equals(beginDateTime.minusMinutes(10L).format(formatForLastReduceKey))) {
                     || key.equals(beginDateTime.minusMinutes(10L).format(formatForLastReduceKey))) {
                 continue;
                 continue;
             }
             }
-            lastReduceState.remove(key);
+            removeKeys.add(key);
+        }
+        if (!removeKeys.isEmpty()) {
+            for (String key : removeKeys) {
+                lastReduceState.remove(key);
+            }
         }
         }
+        removeKeys.clear();
         for (String key : yesterdayMinuteState.keys()) {
         for (String key : yesterdayMinuteState.keys()) {
             if (key.equals(DateUtil.formatLocalDate(beginDate))
             if (key.equals(DateUtil.formatLocalDate(beginDate))
                     || key.equals(DateUtil.formatLocalDate(beginDate.minusDays(1L)))
                     || key.equals(DateUtil.formatLocalDate(beginDate.minusDays(1L)))
                     || key.equals(DateUtil.formatLocalDate(beginDate.minusDays(2L)))) {
                     || key.equals(DateUtil.formatLocalDate(beginDate.minusDays(2L)))) {
                 continue;
                 continue;
             }
             }
+            removeKeys.add(key);
             yesterdayMinuteState.remove(key);
             yesterdayMinuteState.remove(key);
         }
         }
+        if (!removeKeys.isEmpty()) {
+            for (String key : removeKeys) {
+                yesterdayMinuteState.remove(key);
+            }
+        }
     }
     }
 
 
     @Override
     @Override

+ 14 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/PlanMinuteDWDProcess.java

@@ -114,21 +114,33 @@ public class PlanMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteOD
         lastReduceState.put(beginDateTime.format(formatForLastReduceKey), newAdStat);
         lastReduceState.put(beginDateTime.format(formatForLastReduceKey), newAdStat);
         yesterdayMinuteState.put(statDay, newAdStat);
         yesterdayMinuteState.put(statDay, newAdStat);
 
 
+        List<String> removeKeys = new ArrayList<>(10);
         for (String key : lastReduceState.keys()) {
         for (String key : lastReduceState.keys()) {
             if (key.equals(beginDateTime.format(formatForLastReduceKey))
             if (key.equals(beginDateTime.format(formatForLastReduceKey))
                     || key.equals(beginDateTime.minusMinutes(5L).format(formatForLastReduceKey))
                     || key.equals(beginDateTime.minusMinutes(5L).format(formatForLastReduceKey))
                     || key.equals(beginDateTime.minusMinutes(10L).format(formatForLastReduceKey))) {
                     || key.equals(beginDateTime.minusMinutes(10L).format(formatForLastReduceKey))) {
                 continue;
                 continue;
             }
             }
-            lastReduceState.remove(key);
+            removeKeys.add(key);
         }
         }
+        if (!removeKeys.isEmpty()) {
+            for (String key : removeKeys) {
+                lastReduceState.remove(key);
+            }
+        }
+        removeKeys.clear();
         for (String key : yesterdayMinuteState.keys()) {
         for (String key : yesterdayMinuteState.keys()) {
             if (key.equals(DateUtil.formatLocalDate(beginDate))
             if (key.equals(DateUtil.formatLocalDate(beginDate))
                     || key.equals(DateUtil.formatLocalDate(beginDate.minusDays(1L)))
                     || key.equals(DateUtil.formatLocalDate(beginDate.minusDays(1L)))
                     || key.equals(DateUtil.formatLocalDate(beginDate.minusDays(2L)))) {
                     || key.equals(DateUtil.formatLocalDate(beginDate.minusDays(2L)))) {
                 continue;
                 continue;
             }
             }
-            yesterdayMinuteState.remove(key);
+            removeKeys.add(key);
+        }
+        if (!removeKeys.isEmpty()) {
+            for (String key : removeKeys) {
+                yesterdayMinuteState.remove(key);
+            }
         }
         }
     }
     }
 
 

+ 1 - 0
flink-ad-monitoring/src/main/resources/application.properties

@@ -12,4 +12,5 @@ kafka.servers=114.55.59.94:9093,112.124.33.132:9093
 kafka.username=alikafka_pre-cn-tl32fsx4l00x
 kafka.username=alikafka_pre-cn-tl32fsx4l00x
 kafka.password=VOEdhZLjOrL76lrl5bqPtydtoEkbs0Ny
 kafka.password=VOEdhZLjOrL76lrl5bqPtydtoEkbs0Ny
 kafka.sslPath=D:\\Downloads\\kafka.client.truststore.jks
 kafka.sslPath=D:\\Downloads\\kafka.client.truststore.jks
+# kafka.sslPath=/root/flink-1.13.2/kafka.client.truststore.jks
 kafka.connModule=SASL_SSL
 kafka.connModule=SASL_SSL