Pārlūkot izejas kodu

增加清理逻辑

wcc 3 gadi atpakaļ
vecāks
revīzija
dd840615eb

+ 1 - 1
flink-ad-monitoring/dependency-reduced-pom.xml

@@ -97,7 +97,7 @@
               </filters>
               <transformers>
                 <transformer>
-                  <mainClass>flink.zanxiangnet.ad.monitoring.PlanHourStreamJob</mainClass>
+                  <mainClass>flink.zanxiangnet.ad.monitoring.AdHourStreamJob</mainClass>
                 </transformer>
               </transformers>
             </configuration>

+ 1 - 1
flink-ad-monitoring/pom.xml

@@ -230,7 +230,7 @@ under the License.
                             <transformers>
                                 <transformer
                                         implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-                                    <mainClass>flink.zanxiangnet.ad.monitoring.PlanHourStreamJob</mainClass>
+                                    <mainClass>flink.zanxiangnet.ad.monitoring.AdHourStreamJob</mainClass>
                                 </transformer>
                             </transformers>
                         </configuration>

+ 2 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdHourStatJob.java → flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdHourStreamJob.java

@@ -25,7 +25,7 @@ import java.time.Duration;
 import java.util.*;
 
 @Slf4j
-public class AdHourStatJob {
+public class AdHourStreamJob {
 
     /**
      * 可能有数据的最早日期
@@ -37,7 +37,7 @@ public class AdHourStatJob {
 
         // 加载配置文件到 flink的全局配置中
         Properties props = new Properties();
-        props.load(AdHourStatJob.class.getResourceAsStream("/application.properties"));
+        props.load(AdHourStreamJob.class.getResourceAsStream("/application.properties"));
         Configuration configuration = new Configuration();
         props.stringPropertyNames().forEach(key -> {
             String value = props.getProperty(key);

+ 2 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/dao/mapper/AdStatOfDayDWDMapper.xml

@@ -92,9 +92,9 @@
             and stat_day >= #{beginDay}
         </if>
         <if test="endDay != null and endDay != ''">
-            and endDay <![CDATA[ <= ]]> #{endDay}
+            and stat_day <![CDATA[ <= ]]> #{endDay}
         </if>
-        order by stat_day desc, create_time desc limit ${queryCount}};
+        order by stat_day desc, create_time desc limit ${queryCount}
     </select>
 
     <insert id="add">

+ 5 - 4
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/dao/mapper/PlanStatOfDayDWDMapper.xml

@@ -84,15 +84,16 @@
     </sql>
 
     <select id="lastReduceResult" resultType="PlanStatOfDayDWD">
-        select <include refid="Base_Column_List"/>
+        select
+        <include refid="Base_Column_List"/>
         from plan_stat_of_day where campaign_id = #{campaignId}
         <if test="beginDay != null and beginDay != ''">
             and stat_day >= #{beginDay}
         </if>
         <if test="endDay != null and endDay != ''">
-            and endDay <![CDATA[ <= ]]> #{endDay}
+            and stat_day <![CDATA[ <= ]]> #{endDay}
         </if>
-        order by stat_day desc, create_time desc limit ${queryCount}};
+        order by stat_day desc, create_time desc limit ${queryCount}
     </select>
 
     <insert id="add">
@@ -178,7 +179,7 @@
     </insert>
 
     <insert id="addBatch">
-        INSERT INTO ad_stat_of_day(<include refid="Base_Column_List"/>)
+        INSERT INTO plan_stat_of_day(<include refid="Base_Column_List"/>)
         VALUES
         <foreach collection="list" index="index" item="item" separator=",">
             ( #{item.statDay}, #{item.accountId},

+ 4 - 4
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/kafka/KafkaComponent.java

@@ -17,11 +17,11 @@ public class KafkaComponent {
         public static final String adDayTopic = "ad_day_cost_topic";
 
         public static class KafkaGroupId {
-            public static final String adHourConsumerGroup = "ad_hour_consumer_group";
-            public static final String adDayConsumerGroup = "ad_day_consumer_group";
+            public static final String adHourConsumerGroup = "ad_hour_consumer";
+            public static final String adDayConsumerGroup = "ad_day_consumer";
 
-            public static final String planHourConsumerGroup = "plan_hour_consumer_group";
-            public static final String planDayConsumerGroup = "plan_day_consumer_group";
+            public static final String planHourConsumerGroup = "plan_hour_consumer";
+            public static final String planDayConsumerGroup = "plan_day_consumer";
         }
     }
 

+ 5 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/PlanDayDWDRollMonthProcess.java

@@ -6,6 +6,7 @@ import flink.zanxiangnet.ad.monitoring.pojo.entity.AdDataOfDayODS;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.PlanStatOfDayDWD;
 import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
 import flink.zanxiangnet.ad.monitoring.util.DateUtil;
+import flink.zanxiangnet.ad.monitoring.util.PlanUtil;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
@@ -76,6 +77,10 @@ public class PlanDayDWDRollMonthProcess extends KeyedProcessFunction<Long, AdDat
                 PlanStatOfDayDWDMapper mapper = session.getMapper(PlanStatOfDayDWDMapper.class);
                 Map<String, PlanStatOfDayDWD> historyData = mapper.lastReduceResult(adId, null, DateUtil.formatLocalDate(LocalDate.now().minusDays(1L)), 60)
                         .stream()
+                        .peek(value -> {
+                            value.setAdIds(PlanUtil.parseAdStr(value.getAdIdsStr()));
+                            value.setAdGroupMap(PlanUtil.parseAdGroupMapStr(value.getAdGroupMapStr()));
+                        })
                         .collect(Collectors.toMap(PlanStatOfDayDWD::getStatDay, data -> data, (val1, val2) -> val2));
                 historyReduceState.clear();
                 historyReduceState.putAll(historyData);

+ 5 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/PlanHourDWDProcess.java

@@ -7,6 +7,7 @@ import flink.zanxiangnet.ad.monitoring.pojo.entity.PlanStatOfDayDWD;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.PlanStatOfHourDWD;
 import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
 import flink.zanxiangnet.ad.monitoring.util.DateUtil;
+import flink.zanxiangnet.ad.monitoring.util.PlanUtil;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
@@ -84,6 +85,10 @@ public class PlanHourDWDProcess extends KeyedProcessFunction<Long, AdDataOfHourO
                 PlanStatOfDayDWDMapper mapper = session.getMapper(PlanStatOfDayDWDMapper.class);
                 Map<String, PlanStatOfDayDWD> historyData = mapper.lastReduceResult(adId, null, DateUtil.formatLocalDate(today.minusDays(1L)), 60)
                         .stream()
+                        .peek(value -> {
+                            value.setAdIds(PlanUtil.parseAdStr(value.getAdIdsStr()));
+                            value.setAdGroupMap(PlanUtil.parseAdGroupMapStr(value.getAdGroupMapStr()));
+                        })
                         .collect(Collectors.toMap(PlanStatOfDayDWD::getStatDay, data -> data, (val1, val2) -> val2));
                 historyReduceState.clear();
                 historyReduceState.putAll(historyData);

+ 6 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/PlanMinuteDWDProcess.java

@@ -5,6 +5,7 @@ import flink.zanxiangnet.ad.monitoring.dao.mapper.PlanStatOfDayDWDMapper;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.*;
 import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
 import flink.zanxiangnet.ad.monitoring.util.DateUtil;
+import flink.zanxiangnet.ad.monitoring.util.PlanUtil;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
@@ -26,6 +27,7 @@ import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
 import java.util.*;
+import java.util.stream.Collectors;
 
 @Slf4j
 public class PlanMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteODS, PlanStatOfMinuteDWD, Long, TimeWindow> {
@@ -108,7 +110,10 @@ public class PlanMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteOD
         if (lastQueryDay == null || !lastQueryDay.equals(statDay)) {
             try (SqlSession session = sqlSessionFactory.openSession()) {
                 PlanStatOfDayDWDMapper mapper = session.getMapper(PlanStatOfDayDWDMapper.class);
-                List<PlanStatOfDayDWD> historyDayData = mapper.lastReduceResult(campaignId, null, DateUtil.formatLocalDate(beginDate.minusDays(2L)), 1);
+                List<PlanStatOfDayDWD> historyDayData = mapper.lastReduceResult(campaignId, null, DateUtil.formatLocalDate(beginDate.minusDays(2L)), 1).stream().peek(value -> {
+                    value.setAdIds(PlanUtil.parseAdStr(value.getAdIdsStr()));
+                    value.setAdGroupMap(PlanUtil.parseAdGroupMapStr(value.getAdGroupMapStr()));
+                }).collect(Collectors.toList());
                 if (!historyDayData.isEmpty()) {
                     historyDayState.update(historyDayData.get(historyDayData.size() - 1));
                 }

+ 0 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/sink/TunnelBatchStreamSink.java

@@ -86,7 +86,6 @@ public class TunnelBatchStreamSink<T, IN extends List<T>> extends RichSinkFuncti
     public void invoke(IN value, Context context) throws TunnelException, IOException, InvocationTargetException, IllegalAccessException {
         T element = value.get(0);
         String partitionStr = generatePartitionStr(element);
-        System.out.println("[" + tableName + "]写入数据量:" + value.size() + "写入分区:" + partitionStr);
         TableTunnel.StreamUploadSession uploadSession = tunnel.createStreamUploadSession(projectName, tableName, StringUtils.isBlank(partitionStr) ? null : new PartitionSpec(partitionStr), true);
         TableTunnel.StreamRecordPack pack = uploadSession.newRecordPack();
         for (T t : value) {