root 3 éve
szülő
commit
bd81df04db

+ 2 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdHourStreamJob.java

@@ -157,7 +157,7 @@ public class AdHourStreamJob {
                 adHourDWDAllStream
                         .keyBy(AdStatOfHourDWD::getAdId)
                         .process(new CostHourProcess());
-        new BatchStream<>(adHourDMStream, 3000L, Time.minutes(1L))
+        new BatchStream<>(adHourDMStream, 5000L, Time.minutes(1L))
                 .toBatch()
                 .setParallelism(1)
                 .addSink(new AdHourDMToCkBatchSink())
@@ -169,7 +169,7 @@ public class AdHourStreamJob {
                 .keyBy(AdStatOfDayDWD::getAdId)
                 .process(new AdDayOnTimeStreamCompletionProcess());
         // 写入 ck
-        new BatchStream<>(dayStreamFromHour, 3000L, Time.minutes(1L))
+        new BatchStream<>(dayStreamFromHour, 5000L, Time.minutes(1L))
                 .toBatch()
                 .setParallelism(1)
                 .addSink(new AdDayDWDToDBBatchSink())

+ 5 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/sink/AdDayDWDToDBBatchSink.java

@@ -63,10 +63,15 @@ public class AdDayDWDToDBBatchSink extends RichSinkFunction<List<AdStatOfDayDWD>
      */
     @Override
     public void invoke(List<AdStatOfDayDWD> value, Context context) throws TunnelException, IOException, InvocationTargetException, IllegalAccessException {
+        log.error("-----------准备往【ad_stat_of_day】写入数据, {}-----------", value.size());
         try (SqlSession session = sqlSessionFactory.openSession()) {
             AdStatOfDayDWDMapper mapper = session.getMapper(AdStatOfDayDWDMapper.class);
             mapper.addBatch(value);
             session.commit();
+        } catch (Exception e) {
+            log.error("---------------------------------------------ad_day_dwd_to_ck---------------------------------------------");
+            log.error(e.getMessage(), e);
+            log.error("---------------------------------------------ad_day_dwd_to_ck---------------------------------------------");
         }
     }
 

+ 2 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/sink/AdHourDMToCkBatchSink.java

@@ -51,6 +51,8 @@ public class AdHourDMToCkBatchSink extends RichSinkFunction<List<CostHourDM>> {
         configuration.getTypeAliasRegistry().registerAlias(CostHourDM.class);
         // addMapper一定要放到 alias的后面!!!!!
         configuration.addMapper(CostHourDMMapper.class);
+
+
         sqlSessionFactory = new SqlSessionFactoryBuilder().build(configuration);
     }
 

+ 0 - 9
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/sink/TunnelBatchStreamSink.java

@@ -79,14 +79,10 @@ public class TunnelBatchStreamSink<T, IN extends List<T>> extends RichSinkFuncti
      */
     @Override
     public void invoke(IN value, Context context) throws TunnelException, IOException, InvocationTargetException, IllegalAccessException {
-        long start = System.currentTimeMillis();
         T element = value.get(0);
         String partitionStr = generatePartitionStr(element);
         TableTunnel.StreamUploadSession uploadSession = tunnel.createStreamUploadSession(projectName, tableName, StringUtils.isBlank(partitionStr) ? null : new PartitionSpec(partitionStr), true);
         TableTunnel.StreamRecordPack pack = uploadSession.newRecordPack();
-        long end = System.currentTimeMillis();
-        log.error("准备写入 maxCompute[{}], 准备用时:{}", value.size(), (end - start));
-        start = end;
         for (T t : value) {
             Record record = uploadSession.newRecord();
             for (BeanUtil.FieldInfo fieldInfo : fieldInfoList) {
@@ -104,9 +100,6 @@ public class TunnelBatchStreamSink<T, IN extends List<T>> extends RichSinkFuncti
             // append只是写入内存
             pack.append(record);
         }
-        end = System.currentTimeMillis();
-        log.error("append {}条数据用时:{}", value.size(), end - start);
-        start = end;
         int retry = 0;
         do {
             try {
@@ -120,8 +113,6 @@ public class TunnelBatchStreamSink<T, IN extends List<T>> extends RichSinkFuncti
                 }
             }
         } while (retry++ < 3);
-        end = System.currentTimeMillis();
-        log.error("写入 {}条数据用时:{}", value.size(), end - start);
     }
 
     @Override