소스 검색

日志调整

root 3 년 전
부모
커밋
082359cb0b

+ 1 - 5
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdHourStreamJob.java

@@ -1,6 +1,5 @@
 package flink.zanxiangnet.ad.monitoring;
 
-import com.zanxiangnet.module.util.DateUtil;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.*;
 import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
 import flink.zanxiangnet.ad.monitoring.process.*;
@@ -18,11 +17,8 @@ import org.apache.flink.api.common.eventtime.*;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.kafka.source.KafkaSource;
-import org.apache.flink.contrib.streaming.state.ConfigurableRocksDBOptionsFactory;
-import org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory;
 import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
 import org.apache.flink.contrib.streaming.state.PredefinedOptions;
-import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -138,7 +134,7 @@ public class AdHourStreamJob {
                 adHourODSStream.keyBy(AdDataOfHourODS::getAdId)
                         .process(new AdHourDWDProcess());
 
-        // 小时流-写入maxCompute
+        // 小时流-写入ck
         new KeyedBatchStream<>(adHourDWDStream, AdStatOfHourDWD::getStatDay, 3000L, Time.minutes(1L))
                 .toBatch()
                 .setParallelism(8)

+ 3 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdDayOnTimeStreamCompletionProcess.java

@@ -5,9 +5,11 @@ import flink.zanxiangnet.ad.monitoring.dao.mapper.AdStatOfDayDWDMapper;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfDayDWD;
 import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
 import com.zanxiangnet.module.util.DateUtil;
+import flink.zanxiangnet.ad.monitoring.util.StateTtlUtil;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 import org.apache.flink.util.Collector;
@@ -60,6 +62,7 @@ public class AdDayOnTimeStreamCompletionProcess extends KeyedProcessFunction<Lon
         sqlSessionFactory = new SqlSessionFactoryBuilder().build(configuration);
 
         ValueStateDescriptor<String> lastQueryDayStateDescriptor = new ValueStateDescriptor<>("lastQueryDayState", String.class);
+        lastQueryDayStateDescriptor.enableTimeToLive(StateTtlUtil.rocketDBTtl(Time.hours(25)));
         lastQueryDayState = getRuntimeContext().getState(lastQueryDayStateDescriptor);
     }
 

+ 2 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdHourDWDProcess.java

@@ -67,9 +67,11 @@ public class AdHourDWDProcess extends KeyedProcessFunction<Long, AdDataOfHourODS
         ValueStateDescriptor<Long> lastQueryTimeStateDescriptor = new ValueStateDescriptor<>("lastQueryTimeState", Types.LONG);
         lastQueryTimeStateDescriptor.enableTimeToLive(StateTtlUtil.rocketDBTtl(Time.hours(1)));
         lastQueryTimeState = getRuntimeContext().getState(lastQueryTimeStateDescriptor);
+
         ListStateDescriptor<AdStatOfDayDWD> historyReduceStateDescriptor = new ListStateDescriptor<>("historyReduceState", Types.POJO(AdStatOfDayDWD.class));
         historyReduceStateDescriptor.enableTimeToLive(StateTtlUtil.rocketDBTtl(Time.hours(1)));
         historyReduceState = getRuntimeContext().getListState(historyReduceStateDescriptor);
+
         MapStateDescriptor<String, Map<Integer, AdDataOfHourODS>> historyStateDescriptor = new MapStateDescriptor<>("historyState", Types.STRING, Types.MAP(Types.INT, Types.POJO(AdDataOfHourODS.class)));
         historyStateDescriptor.enableTimeToLive(StateTtlUtil.rocketDBTtl(Time.hours(24)));
         historyState = getRuntimeContext().getMapState(historyStateDescriptor);

+ 2 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdMinuteDWDProcess.java

@@ -73,9 +73,11 @@ public class AdMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteODS,
         ListStateDescriptor<AdStatOfDayDWD> historyDayStateDescriptor = new ListStateDescriptor<>("historyDayState", Types.POJO(AdStatOfDayDWD.class));
         historyDayStateDescriptor.enableTimeToLive(StateTtlUtil.rocketDBTtl(Time.hours(2)));
         historyDayState = getRuntimeContext().getListState(historyDayStateDescriptor);
+
         ValueStateDescriptor<Long> lastQueryTimeStateDescriptor = new ValueStateDescriptor<>("lastQueryTimeState", Types.LONG);
         lastQueryTimeStateDescriptor.enableTimeToLive(StateTtlUtil.rocketDBTtl(Time.hours(2)));
         lastQueryTimeState = getRuntimeContext().getState(lastQueryTimeStateDescriptor);
+
         MapStateDescriptor<String, AdStatOfMinuteDWD> lastReduceStateDescriptor = new MapStateDescriptor<>("lastReduceState", Types.STRING, Types.POJO(AdStatOfMinuteDWD.class));
         lastReduceStateDescriptor.enableTimeToLive(StateTtlUtil.rocketDBTtl(Time.hours(25)));
         lastReduceState = getRuntimeContext().getMapState(lastReduceStateDescriptor);

+ 5 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostHourProcess.java

@@ -4,8 +4,10 @@ import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfHourDWD;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.CostHourDM;
 import com.zanxiangnet.module.util.DateUtil;
 import flink.zanxiangnet.ad.monitoring.util.NumberUtil;
+import flink.zanxiangnet.ad.monitoring.util.StateTtlUtil;
 import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
@@ -24,7 +26,9 @@ public class CostHourProcess extends KeyedProcessFunction<Long, AdStatOfHourDWD,
 
     @Override
     public void open(Configuration conf) throws SQLException, ClassNotFoundException, IOException {
-        historyReduceState = getRuntimeContext().getMapState(new MapStateDescriptor<>("historyReduceState", Types.STRING, Types.MAP(Types.INT, Types.POJO(AdStatOfHourDWD.class))));
+        MapStateDescriptor<String, Map<Integer, AdStatOfHourDWD>> historyReduceStateDescriptor = new MapStateDescriptor<>("historyReduceState", Types.STRING, Types.MAP(Types.INT, Types.POJO(AdStatOfHourDWD.class)));
+        historyReduceStateDescriptor.enableTimeToLive(StateTtlUtil.rocketDBTtl(Time.hours(25)));
+        historyReduceState = getRuntimeContext().getMapState(historyReduceStateDescriptor);
     }
 
     @Override

+ 1 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/sink/AdDayDWDToDBBatchSink.java

@@ -72,6 +72,7 @@ public class AdDayDWDToDBBatchSink extends RichSinkFunction<List<AdStatOfDayDWD>
             log.error("---------------------------------------------ad_day_dwd_to_ck---------------------------------------------");
             log.error(e.getMessage(), e);
             log.error("---------------------------------------------ad_day_dwd_to_ck---------------------------------------------");
+            throw new RuntimeException(e);
         }
     }
 

+ 6 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/sink/AdHourDMToCkBatchSink.java

@@ -64,9 +64,15 @@ public class AdHourDMToCkBatchSink extends RichSinkFunction<List<CostHourDM>> {
      */
     @Override
     public void invoke(List<CostHourDM> value, Context context) throws TunnelException, IOException, InvocationTargetException, IllegalAccessException {
+        log.error("-----------准备往【cost_hour】写入数据, {}-----------", value.size());
         try (SqlSession session = sqlSessionFactory.openSession()) {
             CostHourDMMapper mapper = session.getMapper(CostHourDMMapper.class);
             mapper.addBatch(value);
+        } catch (Exception e) {
+            log.error("---------------------------------------------cost_hour---------------------------------------------");
+            log.error(e.getMessage(), e);
+            log.error("---------------------------------------------cost_hour---------------------------------------------");
+            throw new RuntimeException(e);
         }
     }
 

+ 0 - 4
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/sink/OssBatchStreamSink.java

@@ -81,7 +81,6 @@ public class OssBatchStreamSink<T, IN extends List<T>> extends RichSinkFunction<
     @Override
     public void invoke(IN value, Context context) {
         LocalDateTime now = LocalDateTime.now();
-        long start = System.currentTimeMillis();
         int index = 0;
         do {
             String savePath = System.getProperty("java.io.tmpdir") + "/flink/oss/" + now.toLocalDate().format(YYYY_MM_DD) + "/";
@@ -95,7 +94,6 @@ public class OssBatchStreamSink<T, IN extends List<T>> extends RichSinkFunction<
                 CsvUtil.print(clazz, value, new FileWriter(file));
                 String objectName = generateOssObjectName.objectName(now, savePath, fileName);
                 PutObjectResult result = ossClient.putObject(ossBucket, objectName, new FileInputStream(file));
-                System.out.println("OSS文件上传成功:" + "https://" + ossBucket +"." + endpoint + "/" + objectName);
                 break;
             } catch (InstantiationException | IllegalAccessException | IOException e) {
                 log.error("第 " + index + "次上传数据到 oss失败!" + e.getMessage(), e);
@@ -103,8 +101,6 @@ public class OssBatchStreamSink<T, IN extends List<T>> extends RichSinkFunction<
                 file.delete();
             }
         } while (index++ < RETRY_COUNT);
-        long end = System.currentTimeMillis();
-        System.out.println("写入 " + value.size() + "条数据用时:" + (end - start));
     }
 
     @Override

+ 2 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/trigger/TimerCountTrigger.java

@@ -1,5 +1,6 @@
 package flink.zanxiangnet.ad.monitoring.trigger;
 
+import flink.zanxiangnet.ad.monitoring.util.StateTtlUtil;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
@@ -29,6 +30,7 @@ public class TimerCountTrigger<T, W extends TimeWindow> extends Trigger<T, W> {
     public TimerCountTrigger(Long maxBufferCount, Time bufferRefreshTime) {
         this.maxBufferCount = maxBufferCount;
         this.bufferRefreshTime = bufferRefreshTime.toMilliseconds();
+        countStateDesc.enableTimeToLive(StateTtlUtil.rocketDBTtl(org.apache.flink.api.common.time.Time.milliseconds(bufferRefreshTime.toMilliseconds())));
     }
 
     /**