wcc 3 anni fa
parent
commit
fe4f1b4127

+ 1 - 1
flink-ad-monitoring/dependency-reduced-pom.xml

@@ -97,7 +97,7 @@
               </filters>
               <transformers>
                 <transformer>
-                  <mainClass>flink.zanxiangnet.ad.monitoring.AdHourStreamJob</mainClass>
+                  <mainClass>flink.zanxiangnet.ad.monitoring.AdDayStreamJob</mainClass>
                 </transformer>
               </transformers>
             </configuration>

+ 1 - 1
flink-ad-monitoring/pom.xml

@@ -244,7 +244,7 @@ under the License.
                             <transformers>
                                 <transformer
                                         implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-                                    <mainClass>flink.zanxiangnet.ad.monitoring.AdHourStreamJob</mainClass>
+                                    <mainClass>flink.zanxiangnet.ad.monitoring.AdDayStreamJob</mainClass>
                                 </transformer>
                             </transformers>
                         </configuration>

+ 9 - 5
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdDayStreamJob.java

@@ -7,7 +7,7 @@ import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfDayDWD;
 import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
 import flink.zanxiangnet.ad.monitoring.process.AdDayDWDRollMonthProcess;
 import flink.zanxiangnet.ad.monitoring.process.AdDayDWDRollYearProcess;
-import flink.zanxiangnet.ad.monitoring.sink.AdDayDWDToCkBatchSink;
+import flink.zanxiangnet.ad.monitoring.sink.AdDayDWDToDBBatchSink;
 import flink.zanxiangnet.ad.monitoring.sink.TunnelBatchStreamSink;
 import flink.zanxiangnet.ad.monitoring.stream.BatchStream;
 import flink.zanxiangnet.ad.monitoring.stream.KeyedBatchStream;
@@ -107,7 +107,8 @@ public class AdDayStreamJob {
         // 回滚 30天的数据计算
         SingleOutputStreamOperator<AdStatOfDayDWD> adDayDWDMonthStream = adDayODSStreamSplit.getSideOutput(adDayStreamRollDayTag)
                 .keyBy(AdDataOfDayODS::getAdId)
-                .process(new AdDayDWDRollMonthProcess());
+                .process(new AdDayDWDRollMonthProcess())
+                .setParallelism(4);
 
         // 单个账号回滚一年
         SingleOutputStreamOperator<AdStatOfDayDWD> adDayDWDYearStream = adDayODSStreamSplit.getSideOutput(adDayStreamRollYearTag)
@@ -118,12 +119,15 @@ public class AdDayStreamJob {
         // 写入 maxCompute
         new KeyedBatchStream<>("adDayStream", adDayStream.keyBy(AdStatOfDayDWD::getStatDay), 1000L, 60 * 1000L)
                 .toBatch()
-                .setParallelism(6)
+                .setParallelism(8)
                 .addSink(new TunnelBatchStreamSink<>(AdStatOfDayDWD.class))
-                .setParallelism(6)
+                .setParallelism(8)
                 .name("sink_ad_year_dwd");
         // 写入 ck
-        new BatchStream<>("adDWDToCkStream", adDayStream, 1000L, 60 * 1000L).toBatch().addSink(new AdDayDWDToCkBatchSink());
+        new BatchStream<>("adDWDToCkStream", adDayStream, 1000L, 60 * 1000L)
+                .toBatch()
+                .addSink(new AdDayDWDToDBBatchSink())
+                .name("sink_ad_day_for_db");
 
         env.execute("ad_day_stream_job");
     }

+ 2 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdHourStreamJob.java

@@ -3,7 +3,7 @@ package flink.zanxiangnet.ad.monitoring;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.*;
 import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
 import flink.zanxiangnet.ad.monitoring.process.*;
-import flink.zanxiangnet.ad.monitoring.sink.AdDayDWDToCkBatchSink;
+import flink.zanxiangnet.ad.monitoring.sink.AdDayDWDToDBBatchSink;
 import flink.zanxiangnet.ad.monitoring.sink.AdHourDMToCkBatchSink;
 import flink.zanxiangnet.ad.monitoring.sink.AdMinuteDMToCkBatchSink;
 import flink.zanxiangnet.ad.monitoring.sink.TunnelBatchStreamSink;
@@ -171,7 +171,7 @@ public class AdHourStreamJob {
         // 写入 ck
         new BatchStream<>("adDayDWDToCkStream", dayStreamFromHour, 500L, 60 * 1000L)
                 .toBatch()
-                .addSink(new AdDayDWDToCkBatchSink())
+                .addSink(new AdDayDWDToDBBatchSink())
                 .name("ad_day_dwd_from_hour_sink");
 
         env.execute("ad_hour_stream_job");

+ 1 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdDayDWDRollMonthProcess.java

@@ -77,7 +77,7 @@ public class AdDayDWDRollMonthProcess extends KeyedProcessFunction<Long, AdDataO
         if (lastQueryDay == null || !lastQueryDay.equals(DateUtil.formatLocalDate(LocalDate.now()))) {
             try (SqlSession session = sqlSessionFactory.openSession()) {
                 AdStatOfDayDWDMapper mapper = session.getMapper(AdStatOfDayDWDMapper.class);
-                Map<String, AdStatOfDayDWD> historyData = mapper.lastReduceResult(adId, null, DateUtil.formatLocalDate(LocalDate.now().minusDays(1L)), 60).stream()
+                Map<String, AdStatOfDayDWD> historyData = mapper.lastReduceResult(adId, DateUtil.formatLocalDate(LocalDate.now().minusDays(60L)), DateUtil.formatLocalDate(LocalDate.now().minusDays(1L)), 60).stream()
                         .sorted((o1, o2) -> new Long(o1.getCreateTime().getTime() - o2.getCreateTime().getTime()).intValue())
                         .collect(Collectors.toMap(AdStatOfDayDWD::getStatDay, data -> data, (val1, val2) -> val2));
                 historyReduceState.clear();

+ 1 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/sink/AdDayDWDToCkBatchSink.java → flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/sink/AdDayDWDToDBBatchSink.java

@@ -23,7 +23,7 @@ import java.util.*;
  * 批量数据写出
  */
 @Slf4j
-public class AdDayDWDToCkBatchSink extends RichSinkFunction<List<AdStatOfDayDWD>> {
+public class AdDayDWDToDBBatchSink extends RichSinkFunction<List<AdStatOfDayDWD>> {
 
     private SqlSessionFactory sqlSessionFactory;