Selaa lähdekoodia

从 maxCompute查改成从 ck查

wcc 3 vuotta sitten
vanhempi
commit
6136dbb5be

+ 1 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdDayStreamJob.java

@@ -35,7 +35,7 @@ public class AdDayStreamJob {
 
         // 加载配置文件到 flink的全局配置中
         Properties props = new Properties();
-        props.load(AdStatJob.class.getResourceAsStream("/application.properties"));
+        props.load(AdDayStreamJob.class.getResourceAsStream("/application.properties"));
         Configuration configuration = new Configuration();
         props.stringPropertyNames().forEach(key -> {
             String value = props.getProperty(key);

+ 2 - 28
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdStatJob.java → flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdHourStatJob.java

@@ -1,58 +1,32 @@
 package flink.zanxiangnet.ad.monitoring;
 
-import com.aliyun.odps.Instance;
-import com.aliyun.odps.Odps;
-import com.aliyun.odps.account.Account;
-import com.aliyun.odps.account.AliyunAccount;
-import com.aliyun.odps.data.Record;
-import com.aliyun.odps.task.SQLTask;
-import com.tencent.ads.model.DailyReportsGetListStruct;
 import flink.zanxiangnet.ad.monitoring.clickhouse.sink.BatchSinkHour;
 import flink.zanxiangnet.ad.monitoring.clickhouse.sink.BatchSinkMinute;
-import flink.zanxiangnet.ad.monitoring.maxcompute.MaxComputeLog;
-import flink.zanxiangnet.ad.monitoring.pojo.dto.AdDataOfDayDTO;
-import flink.zanxiangnet.ad.monitoring.pojo.dto.AdStatOfDayODSDTO;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.*;
-import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
 import flink.zanxiangnet.ad.monitoring.process.*;
-import flink.zanxiangnet.ad.monitoring.sink.AdDWDToCkBatchSink;
 import flink.zanxiangnet.ad.monitoring.sink.TunnelBatchStreamSink;
-import flink.zanxiangnet.ad.monitoring.stream.BatchStream;
 import flink.zanxiangnet.ad.monitoring.stream.KeyedBatchStream;
 import flink.zanxiangnet.ad.monitoring.trigger.AdMinuteODSStreamTrigger;
 import flink.zanxiangnet.ad.monitoring.trigger.CostMinuteDMStreamTrigger;
-import flink.zanxiangnet.ad.monitoring.util.DateUtil;
-import flink.zanxiangnet.ad.monitoring.util.JsonUtil;
 import flink.zanxiangnet.ad.monitoring.kafka.KafkaComponent;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.eventtime.*;
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.kafka.source.KafkaSource;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.ProcessFunction;
-import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
-import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
-import org.springframework.beans.BeanUtils;
 
 import java.time.Duration;
-import java.time.LocalDate;
 import java.util.*;
-import java.util.stream.Collectors;
 
 @Slf4j
-public class AdStatJob {
+public class AdHourStatJob {
 
     /**
      * 可能有数据的最早日期
@@ -64,7 +38,7 @@ public class AdStatJob {
 
         // 加载配置文件到 flink的全局配置中
         Properties props = new Properties();
-        props.load(AdStatJob.class.getResourceAsStream("/application.properties"));
+        props.load(AdHourStatJob.class.getResourceAsStream("/application.properties"));
         Configuration configuration = new Configuration();
         props.stringPropertyNames().forEach(key -> {
             String value = props.getProperty(key);

+ 1 - 3
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/Test.java

@@ -1,8 +1,6 @@
 package flink.zanxiangnet.ad.monitoring;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import flink.zanxiangnet.ad.monitoring.sink.AdDWDToCkBatchSink;
-import flink.zanxiangnet.ad.monitoring.stream.BatchStream;
 import flink.zanxiangnet.ad.monitoring.util.DateUtil;
 import flink.zanxiangnet.ad.monitoring.util.JsonUtil;
 import lombok.AllArgsConstructor;
@@ -39,7 +37,7 @@ public class Test {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         // 加载配置文件到 flink的全局配置中
         Properties props = new Properties();
-        props.load(AdStatJob.class.getResourceAsStream("/application.properties"));
+        props.load(Test.class.getResourceAsStream("/application.properties"));
         Configuration configuration = new Configuration();
         props.stringPropertyNames().forEach(key -> {
             String value = props.getProperty(key);

+ 1 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/clickhouse/sink/BatchSinkHour.java

@@ -1,6 +1,5 @@
 package flink.zanxiangnet.ad.monitoring.clickhouse.sink;
 
-import flink.zanxiangnet.ad.monitoring.AdStatJob;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.CostHourDM;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.configuration.Configuration;
@@ -27,7 +26,7 @@ public class BatchSinkHour extends RichSinkFunction<CostHourDM> {
     public void open(Configuration config) throws Exception {
         super.open(config);
         Properties props = new Properties();
-        props.load(AdStatJob.class.getResourceAsStream("/application.properties"));
+        props.load(BatchSinkHour.class.getResourceAsStream("/application.properties"));
         connection = ClickhouseUtil.getConn(props);
     }
 

+ 1 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/clickhouse/sink/BatchSinkMinute.java

@@ -1,6 +1,5 @@
 package flink.zanxiangnet.ad.monitoring.clickhouse.sink;
 
-import flink.zanxiangnet.ad.monitoring.AdStatJob;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.CostMinuterDM;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.configuration.Configuration;
@@ -28,7 +27,7 @@ public class BatchSinkMinute extends RichSinkFunction<CostMinuterDM> {
     public void open(Configuration config) throws Exception {
         super.open(config);
         Properties props = new Properties();
-        props.load(AdStatJob.class.getResourceAsStream("/application.properties"));
+        props.load(BatchSinkMinute.class.getResourceAsStream("/application.properties"));
         connection = ClickhouseUtil.getConn(props);
     }
 

+ 1 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostHourDayProcess.java

@@ -1,6 +1,5 @@
 package flink.zanxiangnet.ad.monitoring.process;
 
-import flink.zanxiangnet.ad.monitoring.AdStatJob;
 import flink.zanxiangnet.ad.monitoring.clickhouse.sink.ClickhouseUtil;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfDayDWD;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.CostHourDM;
@@ -28,7 +27,7 @@ public class CostHourDayProcess extends ProcessWindowFunction<AdStatOfDayDWD, Co
     @Override
     public void open(Configuration conf) throws SQLException, ClassNotFoundException, IOException {
         Properties props = new Properties();
-        props.load(AdStatJob.class.getResourceAsStream("/application.properties"));
+        props.load(CostHourDayProcess.class.getResourceAsStream("/application.properties"));
         connection = ClickhouseUtil.getConn(props);
     }
 

+ 1 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostHourProcess.java

@@ -1,6 +1,5 @@
 package flink.zanxiangnet.ad.monitoring.process;
 
-import flink.zanxiangnet.ad.monitoring.AdStatJob;
 import flink.zanxiangnet.ad.monitoring.clickhouse.sink.ClickhouseUtil;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfHourDWD;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.CostHourDM;
@@ -28,7 +27,7 @@ public class CostHourProcess extends ProcessWindowFunction<AdStatOfHourDWD, Cost
     @Override
     public void open(Configuration conf) throws SQLException, ClassNotFoundException, IOException {
         Properties props = new Properties();
-        props.load(AdStatJob.class.getResourceAsStream("/application.properties"));
+        props.load(CostHourProcess.class.getResourceAsStream("/application.properties"));
         connection = ClickhouseUtil.getConn(props);
     }
 

+ 1 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostMinuteHourProcess.java

@@ -1,6 +1,5 @@
 package flink.zanxiangnet.ad.monitoring.process;
 
-import flink.zanxiangnet.ad.monitoring.AdStatJob;
 import flink.zanxiangnet.ad.monitoring.clickhouse.sink.ClickhouseUtil;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfMinuteDWD;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.CostHourDM;
@@ -31,7 +30,7 @@ public class CostMinuteHourProcess extends ProcessWindowFunction<AdStatOfMinuteD
     @Override
     public void open(Configuration conf) throws SQLException, ClassNotFoundException, IOException {
         Properties props = new Properties();
-        props.load(AdStatJob.class.getResourceAsStream("/application.properties"));
+        props.load(CostMinuteHourProcess.class.getResourceAsStream("/application.properties"));
         connection = ClickhouseUtil.getConn(props);
     }
 

+ 1 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostMinuteProcess.java

@@ -1,6 +1,5 @@
 package flink.zanxiangnet.ad.monitoring.process;
 
-import flink.zanxiangnet.ad.monitoring.AdStatJob;
 import flink.zanxiangnet.ad.monitoring.clickhouse.sink.ClickhouseUtil;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.*;
 import flink.zanxiangnet.ad.monitoring.util.DateUtil;
@@ -28,7 +27,7 @@ public class CostMinuteProcess extends ProcessWindowFunction<AdStatOfMinuteDWD,
     @Override
     public void open(Configuration conf) throws SQLException, ClassNotFoundException, IOException {
         Properties props = new Properties();
-        props.load(AdStatJob.class.getResourceAsStream("/application.properties"));
+        props.load(CostMinuteProcess.class.getResourceAsStream("/application.properties"));
         connection = ClickhouseUtil.getConn(props);
     }