Browse Source

version 0.89 (并发访问 state的 bug)

wcc 3 years ago
parent
commit
593370ee1f

+ 0 - 3
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/PlanStatJob.java

@@ -283,9 +283,6 @@ public class PlanStatJob {
                     @Override
                     public void process(Long elementsCount, ProcessWindowFunction<AdDataOfDayODS, PlanStatOfDayDWD, Long, GlobalWindow>.Context context,
                                         Iterable<AdDataOfDayODS> iterable, Collector<PlanStatOfDayDWD> collector) throws Exception {
-                        for (String key : historyReduceState.keys()) {
-                            PlanStatOfDayDWD newStatData = historyReduceState.get(key);
-                        }
                         AdDataOfDayODS element = iterable.iterator().next();
                         LocalDate statDay = DateUtil.parseLocalDate(element.getStatDay());
                         long now = System.currentTimeMillis();

+ 15 - 11
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdMinuteDWDProcess.java

@@ -26,6 +26,7 @@ import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -120,13 +121,15 @@ public class AdMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteODS,
         yesterdayMinuteState.put(statDay, newAdStat);
 
         List<String> removeKeys = new ArrayList<>(10);
-        for (String key : lastReduceState.keys()) {
-            if (key.equals(beginDateTime.format(formatForLastReduceKey))
-                    || key.equals(beginDateTime.minusMinutes(5L).format(formatForLastReduceKey))
-                    || key.equals(beginDateTime.minusMinutes(10L).format(formatForLastReduceKey))) {
+        Iterator<Map.Entry<String, AdStatOfMinuteDWD>> lastIterator = lastReduceState.iterator();
+        while (lastIterator.hasNext()) {
+            Map.Entry<String, AdStatOfMinuteDWD> temp = lastIterator.next();
+            if (temp.getKey().equals(beginDateTime.format(formatForLastReduceKey))
+                    || temp.getKey().equals(beginDateTime.minusMinutes(5L).format(formatForLastReduceKey))
+                    || temp.getKey().equals(beginDateTime.minusMinutes(10L).format(formatForLastReduceKey))) {
                 continue;
             }
-            removeKeys.add(key);
+            removeKeys.add(temp.getKey());
         }
         if (!removeKeys.isEmpty()) {
             for (String key : removeKeys) {
@@ -134,14 +137,15 @@ public class AdMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteODS,
             }
         }
         removeKeys.clear();
-        for (String key : yesterdayMinuteState.keys()) {
-            if (key.equals(DateUtil.formatLocalDate(beginDate))
-                    || key.equals(DateUtil.formatLocalDate(beginDate.minusDays(1L)))
-                    || key.equals(DateUtil.formatLocalDate(beginDate.minusDays(2L)))) {
+        Iterator<Map.Entry<String, AdStatOfMinuteDWD>> yesterdayIterator = yesterdayMinuteState.iterator();
+        while (yesterdayIterator.hasNext()) {
+            Map.Entry<String, AdStatOfMinuteDWD> temp = yesterdayIterator.next();
+            if (temp.getKey().equals(DateUtil.formatLocalDate(beginDate))
+                    || temp.getKey().equals(DateUtil.formatLocalDate(beginDate.minusDays(1L)))
+                    || temp.getKey().equals(DateUtil.formatLocalDate(beginDate.minusDays(2L)))) {
                 continue;
             }
-            removeKeys.add(key);
-            yesterdayMinuteState.remove(key);
+            removeKeys.add(temp.getKey());
         }
         if (!removeKeys.isEmpty()) {
             for (String key : removeKeys) {

+ 15 - 10
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/PlanMinuteDWDProcess.java

@@ -23,6 +23,7 @@ import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -115,13 +116,15 @@ public class PlanMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteOD
         yesterdayMinuteState.put(statDay, newAdStat);
 
         List<String> removeKeys = new ArrayList<>(10);
-        for (String key : lastReduceState.keys()) {
-            if (key.equals(beginDateTime.format(formatForLastReduceKey))
-                    || key.equals(beginDateTime.minusMinutes(5L).format(formatForLastReduceKey))
-                    || key.equals(beginDateTime.minusMinutes(10L).format(formatForLastReduceKey))) {
+        Iterator<Map.Entry<String, PlanStatOfMinuteDWD>> lastIterator = lastReduceState.iterator();
+        while (lastIterator.hasNext()) {
+            Map.Entry<String, PlanStatOfMinuteDWD> temp = lastIterator.next();
+            if (temp.getKey().equals(beginDateTime.format(formatForLastReduceKey))
+                    || temp.getKey().equals(beginDateTime.minusMinutes(5L).format(formatForLastReduceKey))
+                    || temp.getKey().equals(beginDateTime.minusMinutes(10L).format(formatForLastReduceKey))) {
                 continue;
             }
-            removeKeys.add(key);
+            removeKeys.add(temp.getKey());
         }
         if (!removeKeys.isEmpty()) {
             for (String key : removeKeys) {
@@ -129,13 +132,15 @@ public class PlanMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteOD
             }
         }
         removeKeys.clear();
-        for (String key : yesterdayMinuteState.keys()) {
-            if (key.equals(DateUtil.formatLocalDate(beginDate))
-                    || key.equals(DateUtil.formatLocalDate(beginDate.minusDays(1L)))
-                    || key.equals(DateUtil.formatLocalDate(beginDate.minusDays(2L)))) {
+        Iterator<Map.Entry<String, PlanStatOfMinuteDWD>> yesterdayIterator = yesterdayMinuteState.iterator();
+        while (yesterdayIterator.hasNext()) {
+            Map.Entry<String, PlanStatOfMinuteDWD> temp = yesterdayIterator.next();
+            if (temp.getKey().equals(DateUtil.formatLocalDate(beginDate))
+                    || temp.getKey().equals(DateUtil.formatLocalDate(beginDate.minusDays(1L)))
+                    || temp.getKey().equals(DateUtil.formatLocalDate(beginDate.minusDays(2L)))) {
                 continue;
             }
-            removeKeys.add(key);
+            removeKeys.add(temp.getKey());
         }
         if (!removeKeys.isEmpty()) {
             for (String key : removeKeys) {