浏览代码

继承 mybatis

wcc 3 年之前
父节点
当前提交
01f369c43c
共有 23 个文件被更改,包括 1218 次插入230 次删除
  1. 9 0
      flink-ad-monitoring/dependency-reduced-pom.xml
  2. 22 0
      flink-ad-monitoring/pom.xml
  3. 0 6
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdStatJob.java
  4. 12 99
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/PlanDayStreamJob.java
  5. 20 9
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/Test.java
  6. 0 3
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/clickhouse/sink/BatchSinkHour.java
  7. 0 3
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/clickhouse/sink/BatchSinkMinute.java
  8. 42 0
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/config/ClickhouseDataSourceFactory.java
  9. 15 0
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/dao/mapper/AdStatOfDayDWDMapper.java
  10. 255 0
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/dao/mapper/AdStatOfDayDWDMapper.xml
  11. 2 2
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/kafka/KafkaComponent.java
  12. 26 0
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/pojo/dto/AdStatOfDayODSDTO.java
  13. 623 0
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/pojo/entity/AdStatOfDayDWD.java
  14. 2 2
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/pojo/entity/AdStatOfMinuteDWD.java
  15. 2 2
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/pojo/entity/PlanStatOfMinuteDWD.java
  16. 10 0
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/pojo/properties/ApplicationProperties.java
  17. 89 0
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdDayDWDRollMonthProcess.java
  18. 31 0
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdDayDWDRollYearProcess.java
  19. 0 6
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostHourDayProcess.java
  20. 0 8
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostHourProcess.java
  21. 0 3
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostMinuteHourProcess.java
  22. 0 14
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostMinuteProcess.java
  23. 58 73
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/sink/ClickhouseBatchStreamSink.java

+ 9 - 0
flink-ad-monitoring/dependency-reduced-pom.xml

@@ -7,6 +7,15 @@
   <version>1.0-SNAPSHOT</version>
   <version>1.0-SNAPSHOT</version>
   <url>https://flink.apache.org</url>
   <url>https://flink.apache.org</url>
   <build>
   <build>
+    <resources>
+      <resource>
+        <filtering>true</filtering>
+        <directory>src/main/java</directory>
+        <includes>
+          <include>**/*.xml</include>
+        </includes>
+      </resource>
+    </resources>
     <pluginManagement>
     <pluginManagement>
       <plugins>
       <plugins>
         <plugin>
         <plugin>

+ 22 - 0
flink-ad-monitoring/pom.xml

@@ -143,6 +143,19 @@ under the License.
             <version>5.2.9.RELEASE</version>
             <version>5.2.9.RELEASE</version>
         </dependency>
         </dependency>
 
 
+        <dependency>
+            <groupId>org.mybatis</groupId>
+            <artifactId>mybatis</artifactId>
+            <version>3.5.8</version>
+        </dependency>
+        <!-- HikariCP 连接池 -->
+        <dependency>
+            <groupId>com.zaxxer</groupId>
+            <artifactId>HikariCP</artifactId>
+            <version>4.0.3</version>
+        </dependency>
+
+
         <dependency>
         <dependency>
             <groupId>ru.yandex.clickhouse</groupId>
             <groupId>ru.yandex.clickhouse</groupId>
             <artifactId>clickhouse-jdbc</artifactId>
             <artifactId>clickhouse-jdbc</artifactId>
@@ -158,6 +171,15 @@ under the License.
     </dependencies>
     </dependencies>
 
 
     <build>
     <build>
+        <resources>
+            <resource>
+                <directory>src/main/java</directory>
+                <includes>
+                    <include>**/*.xml</include>
+                </includes>
+                <filtering>true</filtering>
+            </resource>
+        </resources>
         <plugins>
         <plugins>
 
 
             <!-- Java Compiler -->
             <!-- Java Compiler -->

+ 0 - 6
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdStatJob.java

@@ -7,7 +7,6 @@ import com.aliyun.odps.account.AliyunAccount;
 import com.aliyun.odps.data.Record;
 import com.aliyun.odps.data.Record;
 import com.aliyun.odps.task.SQLTask;
 import com.aliyun.odps.task.SQLTask;
 import com.tencent.ads.model.DailyReportsGetListStruct;
 import com.tencent.ads.model.DailyReportsGetListStruct;
-import com.tencent.ads.model.HourlyReportsGetListStruct;
 import flink.zanxiangnet.ad.monitoring.clickhouse.sink.BatchSinkHour;
 import flink.zanxiangnet.ad.monitoring.clickhouse.sink.BatchSinkHour;
 import flink.zanxiangnet.ad.monitoring.clickhouse.sink.BatchSinkMinute;
 import flink.zanxiangnet.ad.monitoring.clickhouse.sink.BatchSinkMinute;
 import flink.zanxiangnet.ad.monitoring.maxcompute.MaxComputeLog;
 import flink.zanxiangnet.ad.monitoring.maxcompute.MaxComputeLog;
@@ -15,12 +14,10 @@ import flink.zanxiangnet.ad.monitoring.pojo.dto.AdDataOfDayDTO;
 import flink.zanxiangnet.ad.monitoring.pojo.dto.AdStatOfDayODSDTO;
 import flink.zanxiangnet.ad.monitoring.pojo.dto.AdStatOfDayODSDTO;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.*;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.*;
 import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
 import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
-import flink.zanxiangnet.ad.monitoring.pojo.dto.AdDataOfMinuteDTO;
 import flink.zanxiangnet.ad.monitoring.process.*;
 import flink.zanxiangnet.ad.monitoring.process.*;
 import flink.zanxiangnet.ad.monitoring.sink.TunnelBatchStreamSink;
 import flink.zanxiangnet.ad.monitoring.sink.TunnelBatchStreamSink;
 import flink.zanxiangnet.ad.monitoring.stream.KeyedBatchStream;
 import flink.zanxiangnet.ad.monitoring.stream.KeyedBatchStream;
 import flink.zanxiangnet.ad.monitoring.trigger.AdMinuteODSStreamTrigger;
 import flink.zanxiangnet.ad.monitoring.trigger.AdMinuteODSStreamTrigger;
-import flink.zanxiangnet.ad.monitoring.trigger.CostHourDMStreamTrigger;
 import flink.zanxiangnet.ad.monitoring.trigger.CostMinuteDMStreamTrigger;
 import flink.zanxiangnet.ad.monitoring.trigger.CostMinuteDMStreamTrigger;
 import flink.zanxiangnet.ad.monitoring.util.DateUtil;
 import flink.zanxiangnet.ad.monitoring.util.DateUtil;
 import flink.zanxiangnet.ad.monitoring.util.JsonUtil;
 import flink.zanxiangnet.ad.monitoring.util.JsonUtil;
@@ -32,7 +29,6 @@ import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.kafka.source.KafkaSource;
 import org.apache.flink.connector.kafka.source.KafkaSource;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -50,8 +46,6 @@ import org.springframework.beans.BeanUtils;
 
 
 import java.time.Duration;
 import java.time.Duration;
 import java.time.LocalDate;
 import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.LocalTime;
 import java.util.*;
 import java.util.*;
 import java.util.stream.Collectors;
 import java.util.stream.Collectors;
 
 

+ 12 - 99
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/PlanDayStreamJob.java

@@ -14,6 +14,8 @@ import flink.zanxiangnet.ad.monitoring.pojo.dto.AdStatOfDayODSDTO;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.AdDataOfDayODS;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.AdDataOfDayODS;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.PlanStatOfDayDWD;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.PlanStatOfDayDWD;
 import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
 import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
+import flink.zanxiangnet.ad.monitoring.process.AdDayDWDRollMonthProcess;
+import flink.zanxiangnet.ad.monitoring.process.AdDayDWDRollYearProcess;
 import flink.zanxiangnet.ad.monitoring.sink.TunnelBatchStreamSink;
 import flink.zanxiangnet.ad.monitoring.sink.TunnelBatchStreamSink;
 import flink.zanxiangnet.ad.monitoring.stream.KeyedBatchStream;
 import flink.zanxiangnet.ad.monitoring.stream.KeyedBatchStream;
 import flink.zanxiangnet.ad.monitoring.util.DateUtil;
 import flink.zanxiangnet.ad.monitoring.util.DateUtil;
@@ -68,30 +70,12 @@ public class PlanDayStreamJob {
         // 广告日数据。往前回滚 1年
         // 广告日数据。往前回滚 1年
         final OutputTag<AdDataOfDayODS> adDayStreamRollYearTag = new OutputTag<AdDataOfDayODS>("adDayStreamRollYearTag") {
         final OutputTag<AdDataOfDayODS> adDayStreamRollYearTag = new OutputTag<AdDataOfDayODS>("adDayStreamRollYearTag") {
         };
         };
+
+        // 天数据
         SingleOutputStreamOperator<AdStatOfDayODSDTO> adDayODSStream = adStreamOfDayIn.filter(StringUtils::isNotBlank)
         SingleOutputStreamOperator<AdStatOfDayODSDTO> adDayODSStream = adStreamOfDayIn.filter(StringUtils::isNotBlank)
-                .map(str -> {
-                    AdDataOfDayDTO dto = JsonUtil.toObj(str, AdDataOfDayDTO.class);
-                    Date createTime = new Date(dto.getCreateTime());
-                    DailyReportsGetListStruct struct = dto.getDailyReportsGetListStruct();
-                    AdDataOfDayODS adODS = new AdDataOfDayODS();
-                    BeanUtils.copyProperties(struct, adODS);
-                    adODS.setStatDay(struct.getDate());
-                    adODS.setAccountId(dto.getAccountId());
-                    adODS.setCampaignId(struct.getCampaignId());
-                    adODS.setAgencyAccountId(struct.getAccountId());
-                    adODS.setWechatAccountId(struct.getWechatAccountId());
-                    adODS.setAdgroupId(struct.getAdgroupId());
-                    adODS.setAdId(struct.getAdId());
-                    adODS.setCreateTime(createTime);
-                    adODS.removeNull();
-                    return AdStatOfDayODSDTO.builder()
-                            .startDate(dto.getStartDate())
-                            .endDate(dto.getEndDate())
-                            .adDataOfDayODS(adODS)
-                            .build();
-                });
+                .map(AdStatOfDayODSDTO::byJson);
 
 
-        // 拆分流
+        // 天数据-拆分流
         SingleOutputStreamOperator<AdStatOfDayODSDTO> adDayODSStreamSplit = adDayODSStream.process(new ProcessFunction<AdStatOfDayODSDTO, AdStatOfDayODSDTO>() {
         SingleOutputStreamOperator<AdStatOfDayODSDTO> adDayODSStreamSplit = adDayODSStream.process(new ProcessFunction<AdStatOfDayODSDTO, AdStatOfDayODSDTO>() {
             @Override
             @Override
             public void processElement(AdStatOfDayODSDTO adDataDTO, ProcessFunction<AdStatOfDayODSDTO, AdStatOfDayODSDTO>.Context context, Collector<AdStatOfDayODSDTO> collector) throws Exception {
             public void processElement(AdStatOfDayODSDTO adDataDTO, ProcessFunction<AdStatOfDayODSDTO, AdStatOfDayODSDTO>.Context context, Collector<AdStatOfDayODSDTO> collector) throws Exception {
@@ -108,92 +92,21 @@ public class PlanDayStreamJob {
             }
             }
         });
         });
 
 
+        // 每日回滚 30天
         SingleOutputStreamOperator<PlanStatOfDayDWD> planDayDWDStream = adDayODSStreamSplit.getSideOutput(adDayStreamRollDayTag)
         SingleOutputStreamOperator<PlanStatOfDayDWD> planDayDWDStream = adDayODSStreamSplit.getSideOutput(adDayStreamRollDayTag)
                 .keyBy(AdDataOfDayODS::getCampaignId)
                 .keyBy(AdDataOfDayODS::getCampaignId)
-                .countWindow(1).process(new ProcessWindowFunction<AdDataOfDayODS, PlanStatOfDayDWD, Long, GlobalWindow>() {
-                    private Odps odps;
-                    // 上次查询的时间
-                    private ValueState<String> lastQueryDayState;
-                    // 之前聚合的昨天的数据
-                    private MapState<String, PlanStatOfDayDWD> historyReduceState;
-
-                    @Override
-                    public void open(Configuration conf) throws Exception {
-                        Map<String, String> params = getRuntimeContext()
-                                .getExecutionConfig()
-                                .getGlobalJobParameters()
-                                .toMap();
-                        Account account = new AliyunAccount(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_ID),
-                                params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_KEY));
-                        odps = new Odps(account);
-                        odps.getRestClient().setRetryLogger(new MaxComputeLog());
-                        odps.setEndpoint(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_ENDPOINT));
-                        odps.setDefaultProject(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_PROJECT_NAME));
-
-                        lastQueryDayState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastQueryDayState", String.class));
-                        historyReduceState = getRuntimeContext().getMapState(new MapStateDescriptor<>("historyReduceState", String.class, PlanStatOfDayDWD.class));
-                    }
-
-                    @Override
-                    public void process(Long elementsCount, ProcessWindowFunction<AdDataOfDayODS, PlanStatOfDayDWD, Long, GlobalWindow>.Context context,
-                                        Iterable<AdDataOfDayODS> iterable, Collector<PlanStatOfDayDWD> collector) throws Exception {
-                        AdDataOfDayODS element = iterable.iterator().next();
-                        LocalDate statDay = DateUtil.parseLocalDate(element.getStatDay());
-                        long now = System.currentTimeMillis();
-
-                        String lastQueryDay = lastQueryDayState.value();
-                        // 从 maxCompute查找广告的历史数据
-                        if (lastQueryDay == null || !lastQueryDay.equals(DateUtil.formatLocalDate(LocalDate.now()))) {
-                            LocalDate endTime = LocalDate.now(), beginTime = statDay.minusDays(60);
-                            String sql = "SELECT * FROM plan_stat_of_day_dwd WHERE stat_day >= \"" + DateUtil.formatLocalDate(beginTime) + "\" AND stat_day <= \"" + DateUtil.formatLocalDate(endTime) + "\" AND campaign_id = " + element.getCampaignId() + ";";
-                            Instance instance = SQLTask.run(odps, sql);
-                            // log.error("sql: " + sql + ", odps日志: " + odps.logview().generateLogView(instance, 7 * 24));
-                            instance.waitForSuccess();
-                            List<Record> records = SQLTask.getResult(instance);
-                            Map<String, PlanStatOfDayDWD> historyData = records.stream()
-                                    .map(PlanStatOfDayDWD::byMaxCompute)
-                                    .sorted((o1, o2) -> new Long(o1.getCreateTime().getTime() - o2.getCreateTime().getTime()).intValue())
-                                    .collect(Collectors.toMap(PlanStatOfDayDWD::getStatDay, data -> data, (val1, val2) -> val2));
-                            historyReduceState.clear();
-                            historyReduceState.putAll(historyData);
-                            lastQueryDayState.update(DateUtil.formatLocalDate(LocalDate.now()));
-                        }
-                        PlanStatOfDayDWD lastReduceData = null;
-                        for (int i = 1; i <= 60; i++) {
-                            lastReduceData = historyReduceState.get(DateUtil.formatLocalDate(statDay.minusDays(i)));
-                            if (lastReduceData != null) {
-                                break;
-                            }
-                        }
-                        PlanStatOfDayDWD newStatData = PlanStatOfDayDWD.reduce(lastReduceData, element, now);
-                        historyReduceState.put(DateUtil.formatLocalDate(statDay), newStatData);
-                        collector.collect(newStatData);
-                    }
-                });
+                .countWindow(1)
+                .process(new AdDayDWDRollMonthProcess());
         new KeyedBatchStream<>("planDayDWDStream", planDayDWDStream.keyBy(PlanStatOfDayDWD::getStatDay), 4000L, 60 * 1000L)
         new KeyedBatchStream<>("planDayDWDStream", planDayDWDStream.keyBy(PlanStatOfDayDWD::getStatDay), 4000L, 60 * 1000L)
                 .toBatch()
                 .toBatch()
                 .addSink(new TunnelBatchStreamSink<>(PlanStatOfDayDWD.class))
                 .addSink(new TunnelBatchStreamSink<>(PlanStatOfDayDWD.class))
                 .name("sink_plan_day_dwd");
                 .name("sink_plan_day_dwd");
 
 
+        // 往前回滚 365天
         SingleOutputStreamOperator<PlanStatOfDayDWD> planDayDWDYearStream = adDayODSStreamSplit.getSideOutput(adDayStreamRollYearTag)
         SingleOutputStreamOperator<PlanStatOfDayDWD> planDayDWDYearStream = adDayODSStreamSplit.getSideOutput(adDayStreamRollYearTag)
                 .keyBy(AdDataOfDayODS::getCampaignId)
                 .keyBy(AdDataOfDayODS::getCampaignId)
-                .countWindow(1).process(new ProcessWindowFunction<AdDataOfDayODS, PlanStatOfDayDWD, Long, GlobalWindow>() {
-                    // 上次聚合的结果
-                    private ValueState<PlanStatOfDayDWD> lastReduceState;
-
-                    public void open(Configuration conf) {
-                        lastReduceState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastReduceState", PlanStatOfDayDWD.class));
-                    }
-
-                    @Override
-                    public void process(Long elementsCount, ProcessWindowFunction<AdDataOfDayODS, PlanStatOfDayDWD, Long, GlobalWindow>.Context context,
-                                        Iterable<AdDataOfDayODS> elements, Collector<PlanStatOfDayDWD> out) throws Exception {
-                        AdDataOfDayODS element = elements.iterator().next();
-                        PlanStatOfDayDWD newStatDWD = PlanStatOfDayDWD.reduce(lastReduceState.value(), element, System.currentTimeMillis());
-                        out.collect(newStatDWD);
-                        lastReduceState.update(newStatDWD);
-                    }
-                });
+                .countWindow(1)
+                .process(new AdDayDWDRollYearProcess());
         new KeyedBatchStream<>("planDayDWDYearStream", planDayDWDYearStream.keyBy(PlanStatOfDayDWD::getStatDay), 4000L, 60 * 1000L)
         new KeyedBatchStream<>("planDayDWDYearStream", planDayDWDYearStream.keyBy(PlanStatOfDayDWD::getStatDay), 4000L, 60 * 1000L)
                 .toBatch()
                 .toBatch()
                 .addSink(new TunnelBatchStreamSink<>(PlanStatOfDayDWD.class))
                 .addSink(new TunnelBatchStreamSink<>(PlanStatOfDayDWD.class))

+ 20 - 9
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/Test.java

@@ -1,6 +1,7 @@
 package flink.zanxiangnet.ad.monitoring;
 package flink.zanxiangnet.ad.monitoring;
 
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import flink.zanxiangnet.ad.monitoring.sink.ClickhouseBatchStreamSink;
 import flink.zanxiangnet.ad.monitoring.stream.BatchStream;
 import flink.zanxiangnet.ad.monitoring.stream.BatchStream;
 import flink.zanxiangnet.ad.monitoring.stream.KeyedBatchStream;
 import flink.zanxiangnet.ad.monitoring.stream.KeyedBatchStream;
 import flink.zanxiangnet.ad.monitoring.util.DateUtil;
 import flink.zanxiangnet.ad.monitoring.util.DateUtil;
@@ -10,6 +11,7 @@ import lombok.Builder;
 import lombok.Data;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
 import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapState;
@@ -38,6 +40,7 @@ import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
+import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -49,6 +52,15 @@ import java.util.stream.Collectors;
 public class Test {
 public class Test {
     public static void main(String[] args) throws Exception {
     public static void main(String[] args) throws Exception {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        // 加载配置文件到 flink的全局配置中
+        Properties props = new Properties();
+        props.load(AdStatJob.class.getResourceAsStream("/application.properties"));
+        Configuration configuration = new Configuration();
+        props.stringPropertyNames().forEach(key -> {
+            String value = props.getProperty(key);
+            configuration.setString(key.trim(), StringUtils.isBlank(value) ? "" : value.trim());
+        });
+        env.getConfig().setGlobalJobParameters(configuration);
 
 
         env.setParallelism(1);
         env.setParallelism(1);
 
 
@@ -57,7 +69,7 @@ public class Test {
         SingleOutputStreamOperator<Pojo> pojoStream = source.assignTimestampsAndWatermarks(WatermarkStrategy.<Pojo>forBoundedOutOfOrderness(Duration.ofHours(2))
         SingleOutputStreamOperator<Pojo> pojoStream = source.assignTimestampsAndWatermarks(WatermarkStrategy.<Pojo>forBoundedOutOfOrderness(Duration.ofHours(2))
                 .withTimestampAssigner((SerializableTimestampAssigner<Pojo>) (pojo, l) -> pojo.getCreateTime())
                 .withTimestampAssigner((SerializableTimestampAssigner<Pojo>) (pojo, l) -> pojo.getCreateTime())
         );
         );
-        pojoStream.keyBy(Pojo::getUserId)
+        SingleOutputStreamOperator<String> stringStream = pojoStream.keyBy(Pojo::getUserId)
                 .window(TumblingEventTimeWindows.of(Time.days(1L), Time.hours(-8)))
                 .window(TumblingEventTimeWindows.of(Time.days(1L), Time.hours(-8)))
                 .trigger(new Trigger<Pojo, TimeWindow>() {
                 .trigger(new Trigger<Pojo, TimeWindow>() {
                     /**
                     /**
@@ -128,18 +140,17 @@ public class Test {
                         log.error("trigger->clear");
                         log.error("trigger->clear");
                     }
                     }
                 })
                 })
-                .process(new ProcessWindowFunction<Pojo, Tuple5<Integer, Long, Long, Integer, List<Long>>, Integer, TimeWindow>() {
+                .process(new ProcessWindowFunction<Pojo, Pojo, Integer, TimeWindow>() {
                     @Override
                     @Override
                     public void process(Integer integer,
                     public void process(Integer integer,
-                                        ProcessWindowFunction<Pojo, Tuple5<Integer, Long, Long, Integer, List<Long>>, Integer, TimeWindow>.Context context, Iterable<Pojo> iterable,
-                                        Collector<Tuple5<Integer, Long, Long, Integer, List<Long>>> collector) throws Exception {
-                        log.error("----------------- begin -------------------------");
-                        for (Pojo pojo : iterable) {
-                            log.error("收到元素:" + pojo);
+                                        ProcessWindowFunction<Pojo, Pojo, Integer, TimeWindow>.Context context, Iterable<Pojo> iterable,
+                                        Collector<Pojo> collector) throws Exception {
+                        for(Pojo pojo : iterable) {
+                            collector.collect(pojo);
                         }
                         }
-                        log.error("----------------- end --------------------------");
                     }
                     }
-                }).print();
+                }).map(Pojo::toString);
+        new BatchStream<>("", stringStream, 10L, 60 * 1000L).toBatch().addSink(new ClickhouseBatchStreamSink<>());
                 /*.aggregate(new AggregateFunction<Pojo, Tuple5<Integer, Long, Long, Integer, List<Long>>, String>() {
                 /*.aggregate(new AggregateFunction<Pojo, Tuple5<Integer, Long, Long, Integer, List<Long>>, String>() {
 
 
                     @Override
                     @Override

+ 0 - 3
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/clickhouse/sink/BatchSinkHour.java

@@ -5,13 +5,10 @@ import flink.zanxiangnet.ad.monitoring.pojo.entity.CostHourDM;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 import java.sql.PreparedStatement;
 import java.sql.PreparedStatement;
 import java.sql.Connection;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.SQLException;
-import java.sql.Statement;
 import java.util.Properties;
 import java.util.Properties;
 
 
 @Slf4j
 @Slf4j

+ 0 - 3
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/clickhouse/sink/BatchSinkMinute.java

@@ -5,13 +5,10 @@ import flink.zanxiangnet.ad.monitoring.pojo.entity.CostMinuterDM;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 import java.sql.Connection;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.sql.SQLException;
-import java.sql.Statement;
 import java.util.Properties;
 import java.util.Properties;
 
 
 @Slf4j
 @Slf4j

+ 42 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/config/ClickhouseDataSourceFactory.java

@@ -0,0 +1,42 @@
+package flink.zanxiangnet.ad.monitoring.config;
+
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.ibatis.datasource.DataSourceFactory;
+import ru.yandex.clickhouse.BalancedClickhouseDataSource;
+import ru.yandex.clickhouse.settings.ClickHouseProperties;
+
+import javax.sql.DataSource;
+import java.util.Properties;
+
+/**
+ * Clickhouse连接池
+ */
+@Slf4j
+public class ClickhouseDataSourceFactory implements DataSourceFactory {
+
+    public static final String PROP_URL = "url";
+    public static final String PROP_USER = "user";
+    public static final String PROP_PASSWORD = "password";
+
+    private Properties props;
+
+    @Override
+    public void setProperties(Properties props) {
+        this.props = props;
+    }
+
+    @Override
+    public DataSource getDataSource() {
+        ClickHouseProperties ckProps = new ClickHouseProperties();
+        ckProps.setUser(props.getProperty(PROP_USER));
+        ckProps.setPassword(props.getProperty(PROP_PASSWORD));
+        DataSource dataSource = new BalancedClickhouseDataSource(props.getProperty(PROP_URL), ckProps);
+
+        HikariConfig config = new HikariConfig();
+        // 此处还可以配置连接池大小,连接超时之类的
+        config.setDataSource(dataSource);
+        return new HikariDataSource(config);
+    }
+}

+ 15 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/dao/mapper/AdStatOfDayDWDMapper.java

@@ -0,0 +1,15 @@
+package flink.zanxiangnet.ad.monitoring.dao.mapper;
+
+import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfDayDWD;
+import org.apache.ibatis.annotations.Param;
+
+import java.util.List;
+
+public interface AdStatOfDayDWDMapper {
+
+    List<AdStatOfDayDWD> selectAll();
+
+    int add(@Param("item") AdStatOfDayDWD adStatOfDayDWD);
+
+    int addBatch(@Param("list") List<AdStatOfDayDWD> list);
+}

+ 255 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/dao/mapper/AdStatOfDayDWDMapper.xml

@@ -0,0 +1,255 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+
+<mapper namespace="flink.zanxiangnet.ad.monitoring.dao.mapper.AdStatOfDayDWDMapper">
+
+    <sql id="Base_Column_List">
+        stat_day, account_id,
+        campaign_id, agency_account_id,
+        wechat_account_id, wechat_agency_id,
+        adgroup_id, ad_id, create_time,
+        cost_deviation_rate_total, cost_deviation_rate_day,
+        cost_total, cost_day,
+        compensation_amount_total, compensation_amount_day,
+        view_count_total, view_count_day,
+        thousand_display_price_all, thousand_display_price_day,
+        view_user_count_total, view_user_count_day,
+        avg_view_per_user_all, avg_view_per_user_day,
+        valid_click_count_total, valid_click_count_day,
+        click_user_count_total, click_user_count_day,
+        ctr_all, ctr_day,
+        cpc_all, cpc_day,
+        valuable_click_count_total, valuable_click_count_day,
+        valuable_click_rate_all, valuable_click_rate_day,
+        valuable_click_cost_all, valuable_click_cost_day,
+        conversions_count_total, conversions_count_day,
+        conversions_cost_all, conversions_cost_day,
+        conversions_rate_all, conversions_rate_day,
+        deep_conversions_count_total, deep_conversions_count_day,
+        deep_conversions_cost_all, deep_conversions_cost_day,
+        deep_conversions_rate_all, deep_conversions_rate_day,
+        key_page_uv_total, key_page_uv_day,
+        order_count_total, order_count_day,
+        first_day_order_count_total, first_day_order_count_day,
+        web_order_cost_all, web_order_cost_day,
+        order_rate_all, order_rate_day,
+        order_amount_total, order_amount_day,
+        first_day_order_amount_total, first_day_order_amount_day,
+        order_unit_price_all, order_unit_price_day,
+        order_roi_all, order_roi_day,
+        sign_in_count_total, sign_in_count_day,
+        add_wishlist_count_total, add_wishlist_count_day,
+        view_commodity_page_uv_total, view_commodity_page_uv_day,
+        page_reservation_count_total, page_reservation_count_day,
+        leads_purchase_uv_total, leads_purchase_uv_day,
+        leads_purchase_cost_all, leads_purchase_cost_day,
+        leads_purchase_rate_all, leads_purchase_rate_day,
+        scan_follow_count_total, scan_follow_count_day,
+        wechat_app_register_uv_total, wechat_app_register_uv_day,
+        wechat_minigame_register_cost_all, wechat_minigame_register_cost_day,
+        wechat_minigame_register_rate_all, wechat_minigame_register_rate_day,
+        wechat_minigame_arpu_all, wechat_minigame_arpu_day,
+        wechat_minigame_retention_count_total, wechat_minigame_retention_count_day,
+        wechat_minigame_checkout_count_total, wechat_minigame_checkout_count_day,
+        wechat_minigame_checkout_amount_total, wechat_minigame_checkout_amount_day,
+        official_account_follow_count_total, official_account_follow_count_day,
+        official_account_follow_cost_all, official_account_follow_cost_day,
+        official_account_follow_rate_all, official_account_follow_rate_day,
+        official_account_register_user_count_total, official_account_register_user_count_day,
+        official_account_register_rate_all, official_account_register_rate_day,
+        official_account_register_cost_all, official_account_register_cost_day,
+        official_account_register_amount_total, official_account_register_amount_day,
+        official_account_register_roi_all, official_account_register_roi_day,
+        official_account_apply_count_total, official_account_apply_count_day,
+        official_account_apply_user_count_total, official_account_apply_user_count_day,
+        official_account_apply_rate_all, official_account_apply_rate_day,
+        official_account_apply_cost_all, official_account_apply_cost_day,
+        official_account_apply_amount_total, official_account_apply_amount_day,
+        official_account_apply_roi_all, official_account_apply_roi_day,
+        official_account_order_count_total, official_account_order_count_day,
+        official_account_first_day_order_count_total, official_account_first_day_order_count_day,
+        official_account_order_user_count_total, official_account_order_user_count_day,
+        official_account_order_rate_all, official_account_order_rate_day,
+        official_account_order_cost_all, official_account_order_cost_day,
+        official_account_order_amount_total, official_account_order_amount_day,
+        official_account_first_day_order_amount_total, official_account_first_day_order_amount_day,
+        official_account_order_roi_all, official_account_order_roi_day,
+        official_account_consult_count_total, official_account_consult_count_day,
+        official_account_reader_count_total, official_account_reader_count_day,
+        official_account_credit_apply_user_count_total, official_account_credit_apply_user_count_day,
+        official_account_credit_user_count_total, official_account_credit_user_count_day,
+        forward_count_total, forward_count_day,
+        forward_user_count_total, forward_user_count_day,
+        no_interest_count_total, no_interest_count_day
+    </sql>
+
+    <select id="selectAll" resultType="AdStatOfDayDWD">
+        select * from ad_stat_of_day
+    </select>
+
+    <insert id="add">
+        INSERT INTO ad_stat_of_day(<include refid="Base_Column_List"/>)
+        VALUES
+        ( #{item.statDay}, #{item.accountId},
+        #{item.campaignId}, #{item.agencyAccountId},
+        #{item.wechatAccountId}, #{item.wechatAgencyId},
+        #{item.adgroupId}, #{item.adId}, #{item.createTime},
+        #{item.costDeviationRateTotal}, #{item.costDeviationRateDay},
+        #{item.costTotal}, #{item.costDay},
+        #{item.compensationAmountTotal}, #{item.compensationAmountDay},
+        #{item.viewCountTotal}, #{item.viewCountDay},
+        #{item.thousandDisplayPriceAll}, #{item.thousandDisplayPriceDay},
+        #{item.viewUserCountTotal}, #{item.viewUserCountDay},
+        #{item.avgViewPerUserAll}, #{item.avgViewPerUserDay},
+        #{item.validClickCountTotal}, #{item.validClickCountDay},
+        #{item.clickUserCountTotal}, #{item.clickUserCountDay},
+        #{item.ctrAll}, #{item.ctrDay},
+        #{item.cpcAll}, #{item.cpcDay},
+        #{item.valuableClickCountTotal}, #{item.valuableClickCountDay},
+        #{item.valuableClickRateAll}, #{item.valuableClickRateDay},
+        #{item.valuableClickCostAll}, #{item.valuableClickCostDay},
+        #{item.conversionsCountTotal}, #{item.conversionsCountDay},
+        #{item.conversionsCostAll}, #{item.conversionsCostDay},
+        #{item.conversionsRateAll}, #{item.conversionsRateDay},
+        #{item.deepConversionsCountTotal}, #{item.deepConversionsCountDay},
+        #{item.deepConversionsCostAll}, #{item.deepConversionsCostDay},
+        #{item.deepConversionsRateAll}, #{item.deepConversionsRateDay},
+        #{item.keyPageUvTotal}, #{item.keyPageUvDay},
+        #{item.orderCountTotal}, #{item.orderCountDay},
+        #{item.firstDayOrderCountTotal}, #{item.firstDayOrderCountDay},
+        #{item.webOrderCostAll}, #{item.webOrderCostDay},
+        #{item.orderRateAll}, #{item.orderRateDay},
+        #{item.orderAmountTotal}, #{item.orderAmountDay},
+        #{item.firstDayOrderAmountTotal}, #{item.firstDayOrderAmountDay},
+        #{item.orderUnitPriceAll}, #{item.orderUnitPriceDay},
+        #{item.orderRoiAll}, #{item.orderRoiDay},
+        #{item.signInCountTotal}, #{item.signInCountDay},
+        #{item.addWishlistCountTotal}, #{item.addWishlistCountDay},
+        #{item.viewCommodityPageUvTotal}, #{item.viewCommodityPageUvDay},
+        #{item.pageReservationCountTotal}, #{item.pageReservationCountDay},
+        #{item.leadsPurchaseUvTotal}, #{item.leadsPurchaseUvDay},
+        #{item.leadsPurchaseCostAll}, #{item.leadsPurchaseCostDay},
+        #{item.leadsPurchaseRateAll}, #{item.leadsPurchaseRateDay},
+        #{item.scanFollowCountTotal}, #{item.scanFollowCountDay},
+        #{item.wechatAppRegisterUvTotal}, #{item.wechatAppRegisterUvDay},
+        #{item.wechatMinigameRegisterCostAll}, #{item.wechatMinigameRegisterCostDay},
+        #{item.wechatMinigameRegisterRateAll}, #{item.wechatMinigameRegisterRateDay},
+        #{item.wechatMinigameArpuAll}, #{item.wechatMinigameArpuDay},
+        #{item.wechatMinigameRetentionCountTotal}, #{item.wechatMinigameRetentionCountDay},
+        #{item.wechatMinigameCheckoutCountTotal}, #{item.wechatMinigameCheckoutCountDay},
+        #{item.wechatMinigameCheckoutAmountTotal}, #{item.wechatMinigameCheckoutAmountDay},
+        #{item.officialAccountFollowCountTotal}, #{item.officialAccountFollowCountDay},
+        #{item.officialAccountFollowCostAll}, #{item.officialAccountFollowCostDay},
+        #{item.officialAccountFollowRateAll}, #{item.officialAccountFollowRateDay},
+        #{item.officialAccountRegisterUserCountTotal}, #{item.officialAccountRegisterUserCountDay},
+        #{item.officialAccountRegisterRateAll}, #{item.officialAccountRegisterRateDay},
+        #{item.officialAccountRegisterCostAll}, #{item.officialAccountRegisterCostDay},
+        #{item.officialAccountRegisterAmountTotal}, #{item.officialAccountRegisterAmountDay},
+        #{item.officialAccountRegisterRoiAll}, #{item.officialAccountRegisterRoiDay},
+        #{item.officialAccountApplyCountTotal}, #{item.officialAccountApplyCountDay},
+        #{item.officialAccountApplyUserCountTotal}, #{item.officialAccountApplyUserCountDay},
+        #{item.officialAccountApplyRateAll}, #{item.officialAccountApplyRateDay},
+        #{item.officialAccountApplyCostAll}, #{item.officialAccountApplyCostDay},
+        #{item.officialAccountApplyAmountTotal}, #{item.officialAccountApplyAmountDay},
+        #{item.officialAccountApplyRoiAll}, #{item.officialAccountApplyRoiDay},
+        #{item.officialAccountOrderCountTotal}, #{item.officialAccountOrderCountDay},
+        #{item.officialAccountFirstDayOrderCountTotal}, #{item.officialAccountFirstDayOrderCountDay},
+        #{item.officialAccountOrderUserCountTotal}, #{item.officialAccountOrderUserCountDay},
+        #{item.officialAccountOrderRateAll}, #{item.officialAccountOrderRateDay},
+        #{item.officialAccountOrderCostAll}, #{item.officialAccountOrderCostDay},
+        #{item.officialAccountOrderAmountTotal}, #{item.officialAccountOrderAmountDay},
+        #{item.officialAccountFirstDayOrderAmountTotal}, #{item.officialAccountFirstDayOrderAmountDay},
+        #{item.officialAccountOrderRoiAll}, #{item.officialAccountOrderRoiDay},
+        #{item.officialAccountConsultCountTotal}, #{item.officialAccountConsultCountDay},
+        #{item.officialAccountReaderCountTotal}, #{item.officialAccountReaderCountDay},
+        #{item.officialAccountCreditApplyUserCountTotal}, #{item.officialAccountCreditApplyUserCountDay},
+        #{item.officialAccountCreditUserCountTotal}, #{item.officialAccountCreditUserCountDay},
+        #{item.forwardCountTotal}, #{item.forwardCountDay},
+        #{item.forwardUserCountTotal}, #{item.forwardUserCountDay},
+        #{item.noInterestCountTotal}, #{item.noInterestCountDay})
+    </insert>
+
+    <insert id="addBatch">
+        INSERT INTO ad_stat_of_day(<include refid="Base_Column_List"/>)
+        VALUES
+        <foreach collection="list" index="index" item="item" separator=",">
+            ( #{item.statDay}, #{item.accountId},
+            #{item.campaignId}, #{item.agencyAccountId},
+            #{item.wechatAccountId}, #{item.wechatAgencyId},
+            #{item.adgroupId}, #{item.adId}, #{item.createTime},
+            #{item.costDeviationRateTotal}, #{item.costDeviationRateDay},
+            #{item.costTotal}, #{item.costDay},
+            #{item.compensationAmountTotal}, #{item.compensationAmountDay},
+            #{item.viewCountTotal}, #{item.viewCountDay},
+            #{item.thousandDisplayPriceAll}, #{item.thousandDisplayPriceDay},
+            #{item.viewUserCountTotal}, #{item.viewUserCountDay},
+            #{item.avgViewPerUserAll}, #{item.avgViewPerUserDay},
+            #{item.validClickCountTotal}, #{item.validClickCountDay},
+            #{item.clickUserCountTotal}, #{item.clickUserCountDay},
+            #{item.ctrAll}, #{item.ctrDay},
+            #{item.cpcAll}, #{item.cpcDay},
+            #{item.valuableClickCountTotal}, #{item.valuableClickCountDay},
+            #{item.valuableClickRateAll}, #{item.valuableClickRateDay},
+            #{item.valuableClickCostAll}, #{item.valuableClickCostDay},
+            #{item.conversionsCountTotal}, #{item.conversionsCountDay},
+            #{item.conversionsCostAll}, #{item.conversionsCostDay},
+            #{item.conversionsRateAll}, #{item.conversionsRateDay},
+            #{item.deepConversionsCountTotal}, #{item.deepConversionsCountDay},
+            #{item.deepConversionsCostAll}, #{item.deepConversionsCostDay},
+            #{item.deepConversionsRateAll}, #{item.deepConversionsRateDay},
+            #{item.keyPageUvTotal}, #{item.keyPageUvDay},
+            #{item.orderCountTotal}, #{item.orderCountDay},
+            #{item.firstDayOrderCountTotal}, #{item.firstDayOrderCountDay},
+            #{item.webOrderCostAll}, #{item.webOrderCostDay},
+            #{item.orderRateAll}, #{item.orderRateDay},
+            #{item.orderAmountTotal}, #{item.orderAmountDay},
+            #{item.firstDayOrderAmountTotal}, #{item.firstDayOrderAmountDay},
+            #{item.orderUnitPriceAll}, #{item.orderUnitPriceDay},
+            #{item.orderRoiAll}, #{item.orderRoiDay},
+            #{item.signInCountTotal}, #{item.signInCountDay},
+            #{item.addWishlistCountTotal}, #{item.addWishlistCountDay},
+            #{item.viewCommodityPageUvTotal}, #{item.viewCommodityPageUvDay},
+            #{item.pageReservationCountTotal}, #{item.pageReservationCountDay},
+            #{item.leadsPurchaseUvTotal}, #{item.leadsPurchaseUvDay},
+            #{item.leadsPurchaseCostAll}, #{item.leadsPurchaseCostDay},
+            #{item.leadsPurchaseRateAll}, #{item.leadsPurchaseRateDay},
+            #{item.scanFollowCountTotal}, #{item.scanFollowCountDay},
+            #{item.wechatAppRegisterUvTotal}, #{item.wechatAppRegisterUvDay},
+            #{item.wechatMinigameRegisterCostAll}, #{item.wechatMinigameRegisterCostDay},
+            #{item.wechatMinigameRegisterRateAll}, #{item.wechatMinigameRegisterRateDay},
+            #{item.wechatMinigameArpuAll}, #{item.wechatMinigameArpuDay},
+            #{item.wechatMinigameRetentionCountTotal}, #{item.wechatMinigameRetentionCountDay},
+            #{item.wechatMinigameCheckoutCountTotal}, #{item.wechatMinigameCheckoutCountDay},
+            #{item.wechatMinigameCheckoutAmountTotal}, #{item.wechatMinigameCheckoutAmountDay},
+            #{item.officialAccountFollowCountTotal}, #{item.officialAccountFollowCountDay},
+            #{item.officialAccountFollowCostAll}, #{item.officialAccountFollowCostDay},
+            #{item.officialAccountFollowRateAll}, #{item.officialAccountFollowRateDay},
+            #{item.officialAccountRegisterUserCountTotal}, #{item.officialAccountRegisterUserCountDay},
+            #{item.officialAccountRegisterRateAll}, #{item.officialAccountRegisterRateDay},
+            #{item.officialAccountRegisterCostAll}, #{item.officialAccountRegisterCostDay},
+            #{item.officialAccountRegisterAmountTotal}, #{item.officialAccountRegisterAmountDay},
+            #{item.officialAccountRegisterRoiAll}, #{item.officialAccountRegisterRoiDay},
+            #{item.officialAccountApplyCountTotal}, #{item.officialAccountApplyCountDay},
+            #{item.officialAccountApplyUserCountTotal}, #{item.officialAccountApplyUserCountDay},
+            #{item.officialAccountApplyRateAll}, #{item.officialAccountApplyRateDay},
+            #{item.officialAccountApplyCostAll}, #{item.officialAccountApplyCostDay},
+            #{item.officialAccountApplyAmountTotal}, #{item.officialAccountApplyAmountDay},
+            #{item.officialAccountApplyRoiAll}, #{item.officialAccountApplyRoiDay},
+            #{item.officialAccountOrderCountTotal}, #{item.officialAccountOrderCountDay},
+            #{item.officialAccountFirstDayOrderCountTotal}, #{item.officialAccountFirstDayOrderCountDay},
+            #{item.officialAccountOrderUserCountTotal}, #{item.officialAccountOrderUserCountDay},
+            #{item.officialAccountOrderRateAll}, #{item.officialAccountOrderRateDay},
+            #{item.officialAccountOrderCostAll}, #{item.officialAccountOrderCostDay},
+            #{item.officialAccountOrderAmountTotal}, #{item.officialAccountOrderAmountDay},
+            #{item.officialAccountFirstDayOrderAmountTotal}, #{item.officialAccountFirstDayOrderAmountDay},
+            #{item.officialAccountOrderRoiAll}, #{item.officialAccountOrderRoiDay},
+            #{item.officialAccountConsultCountTotal}, #{item.officialAccountConsultCountDay},
+            #{item.officialAccountReaderCountTotal}, #{item.officialAccountReaderCountDay},
+            #{item.officialAccountCreditApplyUserCountTotal}, #{item.officialAccountCreditApplyUserCountDay},
+            #{item.officialAccountCreditUserCountTotal}, #{item.officialAccountCreditUserCountDay},
+            #{item.forwardCountTotal}, #{item.forwardCountDay},
+            #{item.forwardUserCountTotal}, #{item.forwardUserCountDay},
+            #{item.noInterestCountTotal}, #{item.noInterestCountDay})
+        </foreach>
+    </insert>
+</mapper>

+ 2 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/kafka/KafkaComponent.java

@@ -61,8 +61,8 @@ public class KafkaComponent {
                 .setTopics(topic)
                 .setTopics(topic)
                 .setGroupId(groupId)
                 .setGroupId(groupId)
                 .setProperties(kafkaProps)
                 .setProperties(kafkaProps)
-                .setStartingOffsets(OffsetsInitializer.earliest())
-                // .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
+                // .setStartingOffsets(OffsetsInitializer.earliest())
+                .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
                 .setValueOnlyDeserializer(new SimpleStringSchema())
                 .setValueOnlyDeserializer(new SimpleStringSchema())
                 .build();
                 .build();
     }
     }

+ 26 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/pojo/dto/AdStatOfDayODSDTO.java

@@ -1,13 +1,17 @@
 package flink.zanxiangnet.ad.monitoring.pojo.dto;
 package flink.zanxiangnet.ad.monitoring.pojo.dto;
 
 
+import com.tencent.ads.model.DailyReportsGetListStruct;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.AdDataOfDayODS;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.AdDataOfDayODS;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfDayDWD;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfDayDWD;
+import flink.zanxiangnet.ad.monitoring.util.JsonUtil;
 import lombok.AllArgsConstructor;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Builder;
 import lombok.Data;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import lombok.NoArgsConstructor;
+import org.springframework.beans.BeanUtils;
 
 
 import java.time.LocalDate;
 import java.time.LocalDate;
+import java.util.Date;
 
 
 @Data
 @Data
 @NoArgsConstructor
 @NoArgsConstructor
@@ -18,4 +22,26 @@ public class AdStatOfDayODSDTO {
     private LocalDate endDate;
     private LocalDate endDate;
 
 
     private AdDataOfDayODS adDataOfDayODS;
     private AdDataOfDayODS adDataOfDayODS;
+
+    public static AdStatOfDayODSDTO byJson(String json) {
+        AdDataOfDayDTO dto = JsonUtil.toObj(json, AdDataOfDayDTO.class);
+        Date createTime = new Date(dto.getCreateTime());
+        DailyReportsGetListStruct struct = dto.getDailyReportsGetListStruct();
+        AdDataOfDayODS adODS = new AdDataOfDayODS();
+        BeanUtils.copyProperties(struct, adODS);
+        adODS.setStatDay(struct.getDate());
+        adODS.setAccountId(dto.getAccountId());
+        adODS.setCampaignId(struct.getCampaignId());
+        adODS.setAgencyAccountId(struct.getAccountId());
+        adODS.setWechatAccountId(struct.getWechatAccountId());
+        adODS.setAdgroupId(struct.getAdgroupId());
+        adODS.setAdId(struct.getAdId());
+        adODS.setCreateTime(createTime);
+        adODS.removeNull();
+        return AdStatOfDayODSDTO.builder()
+                .startDate(dto.getStartDate())
+                .endDate(dto.getEndDate())
+                .adDataOfDayODS(adODS)
+                .build();
+    }
 }
 }

+ 623 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/pojo/entity/AdStatOfDayDWD.java

@@ -1453,4 +1453,627 @@ public class AdStatOfDayDWD implements Serializable {
         result.setNoInterestCountDay(ObjectUtil.toLong(record.get("no_interest_count_day")));
         result.setNoInterestCountDay(ObjectUtil.toLong(record.get("no_interest_count_day")));
         return result;
         return result;
     }
     }
+
+    public void removeNull() {
+
+        if (statDay == null) {
+            statDay = "";
+        }
+
+        if (accountId == null) {
+            accountId = 0L;
+        }
+
+        if (campaignId == null) {
+            campaignId = 0L;
+        }
+
+        if (agencyAccountId == null) {
+            agencyAccountId = 0L;
+        }
+
+        if (wechatAccountId == null) {
+            wechatAccountId = "";
+        }
+
+        if (wechatAgencyId == null) {
+            wechatAgencyId = "";
+        }
+
+        if (adgroupId == null) {
+            adgroupId = 0L;
+        }
+
+        if (adId == null) {
+            adId = 0L;
+        }
+
+        if (createTime == null) {
+            createTime = new Date();
+        }
+
+        if (costDeviationRateTotal == null) {
+            costDeviationRateTotal = 0.0;
+        }
+
+        if (costDeviationRateDay == null) {
+            costDeviationRateDay = 0.0;
+        }
+
+        if (costTotal == null) {
+            costTotal = 0L;
+        }
+
+        if (costDay == null) {
+            costDay = 0L;
+        }
+
+        if (compensationAmountTotal == null) {
+            compensationAmountTotal = 0L;
+        }
+
+        if (compensationAmountDay == null) {
+            compensationAmountDay = 0L;
+        }
+
+        if (viewCountTotal == null) {
+            viewCountTotal = 0L;
+        }
+
+        if (viewCountDay == null) {
+            viewCountDay = 0L;
+        }
+
+        if (thousandDisplayPriceAll == null) {
+            thousandDisplayPriceAll = 0L;
+        }
+
+        if (thousandDisplayPriceDay == null) {
+            thousandDisplayPriceDay = 0L;
+        }
+
+        if (viewUserCountTotal == null) {
+            viewUserCountTotal = 0L;
+        }
+
+        if (viewUserCountDay == null) {
+            viewUserCountDay = 0L;
+        }
+
+        if (avgViewPerUserAll == null) {
+            avgViewPerUserAll = 0.0;
+        }
+
+        if (avgViewPerUserDay == null) {
+            avgViewPerUserDay = 0.0;
+        }
+
+        if (validClickCountTotal == null) {
+            validClickCountTotal = 0L;
+        }
+
+        if (validClickCountDay == null) {
+            validClickCountDay = 0L;
+        }
+
+        if (clickUserCountTotal == null) {
+            clickUserCountTotal = 0L;
+        }
+
+        if (clickUserCountDay == null) {
+            clickUserCountDay = 0L;
+        }
+
+        if (ctrAll == null) {
+            ctrAll = 0.0;
+        }
+
+        if (ctrDay == null) {
+            ctrDay = 0.0;
+        }
+
+        if (cpcAll == null) {
+            cpcAll = 0L;
+        }
+
+        if (cpcDay == null) {
+            cpcDay = 0L;
+        }
+
+        if (valuableClickCountTotal == null) {
+            valuableClickCountTotal = 0L;
+        }
+
+        if (valuableClickCountDay == null) {
+            valuableClickCountDay = 0L;
+        }
+
+        if (valuableClickRateAll == null) {
+            valuableClickRateAll = 0.0;
+        }
+
+        if (valuableClickRateDay == null) {
+            valuableClickRateDay = 0.0;
+        }
+
+        if (valuableClickCostAll == null) {
+            valuableClickCostAll = 0L;
+        }
+
+        if (valuableClickCostDay == null) {
+            valuableClickCostDay = 0L;
+        }
+
+        if (conversionsCountTotal == null) {
+            conversionsCountTotal = 0L;
+        }
+
+        if (conversionsCountDay == null) {
+            conversionsCountDay = 0L;
+        }
+
+        if (conversionsCostAll == null) {
+            conversionsCostAll = 0L;
+        }
+
+        if (conversionsCostDay == null) {
+            conversionsCostDay = 0L;
+        }
+
+        if (conversionsRateAll == null) {
+            conversionsRateAll = 0.0;
+        }
+
+        if (conversionsRateDay == null) {
+            conversionsRateDay = 0.0;
+        }
+
+        if (deepConversionsCountTotal == null) {
+            deepConversionsCountTotal = 0L;
+        }
+
+        if (deepConversionsCountDay == null) {
+            deepConversionsCountDay = 0L;
+        }
+
+        if (deepConversionsCostAll == null) {
+            deepConversionsCostAll = 0L;
+        }
+
+        if (deepConversionsCostDay == null) {
+            deepConversionsCostDay = 0L;
+        }
+
+        if (deepConversionsRateAll == null) {
+            deepConversionsRateAll = 0.0;
+        }
+
+        if (deepConversionsRateDay == null) {
+            deepConversionsRateDay = 0.0;
+        }
+
+        if (keyPageUvTotal == null) {
+            keyPageUvTotal = 0L;
+        }
+
+        if (keyPageUvDay == null) {
+            keyPageUvDay = 0L;
+        }
+
+        if (orderCountTotal == null) {
+            orderCountTotal = 0L;
+        }
+
+        if (orderCountDay == null) {
+            orderCountDay = 0L;
+        }
+
+        if (firstDayOrderCountTotal == null) {
+            firstDayOrderCountTotal = 0L;
+        }
+
+        if (firstDayOrderCountDay == null) {
+            firstDayOrderCountDay = 0L;
+        }
+
+        if (webOrderCostAll == null) {
+            webOrderCostAll = 0L;
+        }
+
+        if (webOrderCostDay == null) {
+            webOrderCostDay = 0L;
+        }
+
+        if (orderRateAll == null) {
+            orderRateAll = 0.0;
+        }
+
+        if (orderRateDay == null) {
+            orderRateDay = 0.0;
+        }
+
+        if (orderAmountTotal == null) {
+            orderAmountTotal = 0L;
+        }
+
+        if (orderAmountDay == null) {
+            orderAmountDay = 0L;
+        }
+
+        if (firstDayOrderAmountTotal == null) {
+            firstDayOrderAmountTotal = 0L;
+        }
+
+        if (firstDayOrderAmountDay == null) {
+            firstDayOrderAmountDay = 0L;
+        }
+
+        if (orderUnitPriceAll == null) {
+            orderUnitPriceAll = 0L;
+        }
+
+        if (orderUnitPriceDay == null) {
+            orderUnitPriceDay = 0L;
+        }
+
+        if (orderRoiAll == null) {
+            orderRoiAll = 0.0;
+        }
+
+        if (orderRoiDay == null) {
+            orderRoiDay = 0.0;
+        }
+
+        if (signInCountTotal == null) {
+            signInCountTotal = 0L;
+        }
+
+        if (signInCountDay == null) {
+            signInCountDay = 0L;
+        }
+
+        if (addWishlistCountTotal == null) {
+            addWishlistCountTotal = 0L;
+        }
+
+        if (addWishlistCountDay == null) {
+            addWishlistCountDay = 0L;
+        }
+
+        if (viewCommodityPageUvTotal == null) {
+            viewCommodityPageUvTotal = 0L;
+        }
+
+        if (viewCommodityPageUvDay == null) {
+            viewCommodityPageUvDay = 0L;
+        }
+
+        if (pageReservationCountTotal == null) {
+            pageReservationCountTotal = 0L;
+        }
+
+        if (pageReservationCountDay == null) {
+            pageReservationCountDay = 0L;
+        }
+
+        if (leadsPurchaseUvTotal == null) {
+            leadsPurchaseUvTotal = 0L;
+        }
+
+        if (leadsPurchaseUvDay == null) {
+            leadsPurchaseUvDay = 0L;
+        }
+
+        if (leadsPurchaseCostAll == null) {
+            leadsPurchaseCostAll = 0L;
+        }
+
+        if (leadsPurchaseCostDay == null) {
+            leadsPurchaseCostDay = 0L;
+        }
+
+        if (leadsPurchaseRateAll == null) {
+            leadsPurchaseRateAll = 0.0;
+        }
+
+        if (leadsPurchaseRateDay == null) {
+            leadsPurchaseRateDay = 0.0;
+        }
+
+        if (scanFollowCountTotal == null) {
+            scanFollowCountTotal = 0L;
+        }
+
+        if (scanFollowCountDay == null) {
+            scanFollowCountDay = 0L;
+        }
+
+        if (wechatAppRegisterUvTotal == null) {
+            wechatAppRegisterUvTotal = 0L;
+        }
+
+        if (wechatAppRegisterUvDay == null) {
+            wechatAppRegisterUvDay = 0L;
+        }
+
+        if (wechatMinigameRegisterCostAll == null) {
+            wechatMinigameRegisterCostAll = 0L;
+        }
+
+        if (wechatMinigameRegisterCostDay == null) {
+            wechatMinigameRegisterCostDay = 0L;
+        }
+
+        if (wechatMinigameRegisterRateAll == null) {
+            wechatMinigameRegisterRateAll = 0.0;
+        }
+
+        if (wechatMinigameRegisterRateDay == null) {
+            wechatMinigameRegisterRateDay = 0.0;
+        }
+
+        if (wechatMinigameArpuAll == null) {
+            wechatMinigameArpuAll = 0.0;
+        }
+
+        if (wechatMinigameArpuDay == null) {
+            wechatMinigameArpuDay = 0.0;
+        }
+
+        if (wechatMinigameRetentionCountTotal == null) {
+            wechatMinigameRetentionCountTotal = 0L;
+        }
+
+        if (wechatMinigameRetentionCountDay == null) {
+            wechatMinigameRetentionCountDay = 0L;
+        }
+
+        if (wechatMinigameCheckoutCountTotal == null) {
+            wechatMinigameCheckoutCountTotal = 0L;
+        }
+
+        if (wechatMinigameCheckoutCountDay == null) {
+            wechatMinigameCheckoutCountDay = 0L;
+        }
+
+        if (wechatMinigameCheckoutAmountTotal == null) {
+            wechatMinigameCheckoutAmountTotal = 0L;
+        }
+
+        if (wechatMinigameCheckoutAmountDay == null) {
+            wechatMinigameCheckoutAmountDay = 0L;
+        }
+
+        if (officialAccountFollowCountTotal == null) {
+            officialAccountFollowCountTotal = 0L;
+        }
+
+        if (officialAccountFollowCountDay == null) {
+            officialAccountFollowCountDay = 0L;
+        }
+
+        if (officialAccountFollowCostAll == null) {
+            officialAccountFollowCostAll = 0L;
+        }
+
+        if (officialAccountFollowCostDay == null) {
+            officialAccountFollowCostDay = 0L;
+        }
+
+        if (officialAccountFollowRateAll == null) {
+            officialAccountFollowRateAll = 0.0;
+        }
+
+        if (officialAccountFollowRateDay == null) {
+            officialAccountFollowRateDay = 0.0;
+        }
+
+        if (officialAccountRegisterUserCountTotal == null) {
+            officialAccountRegisterUserCountTotal = 0L;
+        }
+
+        if (officialAccountRegisterUserCountDay == null) {
+            officialAccountRegisterUserCountDay = 0L;
+        }
+
+        if (officialAccountRegisterRateAll == null) {
+            officialAccountRegisterRateAll = 0.0;
+        }
+
+        if (officialAccountRegisterRateDay == null) {
+            officialAccountRegisterRateDay = 0.0;
+        }
+
+        if (officialAccountRegisterCostAll == null) {
+            officialAccountRegisterCostAll = 0L;
+        }
+
+        if (officialAccountRegisterCostDay == null) {
+            officialAccountRegisterCostDay = 0L;
+        }
+
+        if (officialAccountRegisterAmountTotal == null) {
+            officialAccountRegisterAmountTotal = 0L;
+        }
+
+        if (officialAccountRegisterAmountDay == null) {
+            officialAccountRegisterAmountDay = 0L;
+        }
+
+        if (officialAccountRegisterRoiAll == null) {
+            officialAccountRegisterRoiAll = 0L;
+        }
+
+        if (officialAccountRegisterRoiDay == null) {
+            officialAccountRegisterRoiDay = 0L;
+        }
+
+        if (officialAccountApplyCountTotal == null) {
+            officialAccountApplyCountTotal = 0L;
+        }
+
+        if (officialAccountApplyCountDay == null) {
+            officialAccountApplyCountDay = 0L;
+        }
+
+        if (officialAccountApplyUserCountTotal == null) {
+            officialAccountApplyUserCountTotal = 0L;
+        }
+
+        if (officialAccountApplyUserCountDay == null) {
+            officialAccountApplyUserCountDay = 0L;
+        }
+
+        if (officialAccountApplyRateAll == null) {
+            officialAccountApplyRateAll = 0.0;
+        }
+
+        if (officialAccountApplyRateDay == null) {
+            officialAccountApplyRateDay = 0.0;
+        }
+
+        if (officialAccountApplyCostAll == null) {
+            officialAccountApplyCostAll = 0L;
+        }
+
+        if (officialAccountApplyCostDay == null) {
+            officialAccountApplyCostDay = 0L;
+        }
+
+        if (officialAccountApplyAmountTotal == null) {
+            officialAccountApplyAmountTotal = 0L;
+        }
+
+        if (officialAccountApplyAmountDay == null) {
+            officialAccountApplyAmountDay = 0L;
+        }
+
+        if (officialAccountApplyRoiAll == null) {
+            officialAccountApplyRoiAll = 0L;
+        }
+
+        if (officialAccountApplyRoiDay == null) {
+            officialAccountApplyRoiDay = 0L;
+        }
+
+        if (officialAccountOrderCountTotal == null) {
+            officialAccountOrderCountTotal = 0L;
+        }
+
+        if (officialAccountOrderCountDay == null) {
+            officialAccountOrderCountDay = 0L;
+        }
+
+        if (officialAccountFirstDayOrderCountTotal == null) {
+            officialAccountFirstDayOrderCountTotal = 0L;
+        }
+
+        if (officialAccountFirstDayOrderCountDay == null) {
+            officialAccountFirstDayOrderCountDay = 0L;
+        }
+
+        if (officialAccountOrderUserCountTotal == null) {
+            officialAccountOrderUserCountTotal = 0L;
+        }
+
+        if (officialAccountOrderUserCountDay == null) {
+            officialAccountOrderUserCountDay = 0L;
+        }
+
+        if (officialAccountOrderRateAll == null) {
+            officialAccountOrderRateAll = 0.0;
+        }
+
+        if (officialAccountOrderRateDay == null) {
+            officialAccountOrderRateDay = 0.0;
+        }
+
+        if (officialAccountOrderCostAll == null) {
+            officialAccountOrderCostAll = 0L;
+        }
+
+        if (officialAccountOrderCostDay == null) {
+            officialAccountOrderCostDay = 0L;
+        }
+
+        if (officialAccountOrderAmountTotal == null) {
+            officialAccountOrderAmountTotal = 0L;
+        }
+
+        if (officialAccountOrderAmountDay == null) {
+            officialAccountOrderAmountDay = 0L;
+        }
+
+        if (officialAccountFirstDayOrderAmountTotal == null) {
+            officialAccountFirstDayOrderAmountTotal = 0L;
+        }
+
+        if (officialAccountFirstDayOrderAmountDay == null) {
+            officialAccountFirstDayOrderAmountDay = 0L;
+        }
+
+        if (officialAccountOrderRoiAll == null) {
+            officialAccountOrderRoiAll = 0L;
+        }
+
+        if (officialAccountOrderRoiDay == null) {
+            officialAccountOrderRoiDay = 0L;
+        }
+
+        if (officialAccountConsultCountTotal == null) {
+            officialAccountConsultCountTotal = 0L;
+        }
+
+        if (officialAccountConsultCountDay == null) {
+            officialAccountConsultCountDay = 0L;
+        }
+
+        if (officialAccountReaderCountTotal == null) {
+            officialAccountReaderCountTotal = 0L;
+        }
+
+        if (officialAccountReaderCountDay == null) {
+            officialAccountReaderCountDay = 0L;
+        }
+
+        if (officialAccountCreditApplyUserCountTotal == null) {
+            officialAccountCreditApplyUserCountTotal = 0L;
+        }
+
+        if (officialAccountCreditApplyUserCountDay == null) {
+            officialAccountCreditApplyUserCountDay = 0L;
+        }
+
+        if (officialAccountCreditUserCountTotal == null) {
+            officialAccountCreditUserCountTotal = 0L;
+        }
+
+        if (officialAccountCreditUserCountDay == null) {
+            officialAccountCreditUserCountDay = 0L;
+        }
+
+        if (forwardCountTotal == null) {
+            forwardCountTotal = 0L;
+        }
+
+        if (forwardCountDay == null) {
+            forwardCountDay = 0L;
+        }
+
+        if (forwardUserCountTotal == null) {
+            forwardUserCountTotal = 0L;
+        }
+
+        if (forwardUserCountDay == null) {
+            forwardUserCountDay = 0L;
+        }
+
+        if (noInterestCountTotal == null) {
+            noInterestCountTotal = 0L;
+        }
+
+        if (noInterestCountDay == null) {
+            noInterestCountDay = 0L;
+        }
+    }
 }
 }

+ 2 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/pojo/entity/AdStatOfMinuteDWD.java

@@ -1831,7 +1831,7 @@ public class AdStatOfMinuteDWD {
             result.setCostDeviationRateTotal(result.getCostDeviationRateTotal() + yesterdayMinuteDWD.getCostDeviationRateDay());
             result.setCostDeviationRateTotal(result.getCostDeviationRateTotal() + yesterdayMinuteDWD.getCostDeviationRateDay());
             result.setCostTotal(result.getCostTotal() + yesterdayMinuteDWD.getCostDay());
             result.setCostTotal(result.getCostTotal() + yesterdayMinuteDWD.getCostDay());
             result.setCompensationAmountTotal(result.getCompensationAmountTotal() + yesterdayMinuteDWD.getCompensationAmountDay());
             result.setCompensationAmountTotal(result.getCompensationAmountTotal() + yesterdayMinuteDWD.getCompensationAmountDay());
-            result.setViewCountTotal(result.getViewCountTotal() + yesterdayMinuteDWD.getCompensationAmountDay());
+            result.setViewCountTotal(result.getViewCountTotal() + yesterdayMinuteDWD.getViewCountDay());
             result.setValidClickCountTotal(result.getValidClickCountTotal() + yesterdayMinuteDWD.getValidClickCountDay());
             result.setValidClickCountTotal(result.getValidClickCountTotal() + yesterdayMinuteDWD.getValidClickCountDay());
             result.setValuableClickCountTotal(result.getValuableClickCountTotal() + yesterdayMinuteDWD.getValuableClickCountDay());
             result.setValuableClickCountTotal(result.getValuableClickCountTotal() + yesterdayMinuteDWD.getValuableClickCountDay());
             result.setConversionsCountTotal(result.getConversionsCountTotal() + yesterdayMinuteDWD.getConversionsCountDay());
             result.setConversionsCountTotal(result.getConversionsCountTotal() + yesterdayMinuteDWD.getConversionsCountDay());
@@ -1868,7 +1868,7 @@ public class AdStatOfMinuteDWD {
         result.setCostDeviationRateTotal(result.getCostDeviationRateTotal() + result.getCostDeviationRateDay());
         result.setCostDeviationRateTotal(result.getCostDeviationRateTotal() + result.getCostDeviationRateDay());
         result.setCostTotal(result.getCostTotal() + result.getCostDay());
         result.setCostTotal(result.getCostTotal() + result.getCostDay());
         result.setCompensationAmountTotal(result.getCompensationAmountTotal() + result.getCompensationAmountDay());
         result.setCompensationAmountTotal(result.getCompensationAmountTotal() + result.getCompensationAmountDay());
-        result.setViewCountTotal(result.getViewCountTotal() + result.getCompensationAmountDay());
+        result.setViewCountTotal(result.getViewCountTotal() + result.getViewCountDay());
         result.setValidClickCountTotal(result.getValidClickCountTotal() + result.getValidClickCountDay());
         result.setValidClickCountTotal(result.getValidClickCountTotal() + result.getValidClickCountDay());
         result.setValuableClickCountTotal(result.getValuableClickCountTotal() + result.getValuableClickCountDay());
         result.setValuableClickCountTotal(result.getValuableClickCountTotal() + result.getValuableClickCountDay());
         result.setConversionsCountTotal(result.getConversionsCountTotal() + result.getConversionsCountDay());
         result.setConversionsCountTotal(result.getConversionsCountTotal() + result.getConversionsCountDay());

+ 2 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/pojo/entity/PlanStatOfMinuteDWD.java

@@ -1865,7 +1865,7 @@ public class PlanStatOfMinuteDWD {
             result.setCostDeviationRateTotal(result.getCostDeviationRateTotal() + yesterdayMinuteDWD.getCostDeviationRateDay());
             result.setCostDeviationRateTotal(result.getCostDeviationRateTotal() + yesterdayMinuteDWD.getCostDeviationRateDay());
             result.setCostTotal(result.getCostTotal() + yesterdayMinuteDWD.getCostDay());
             result.setCostTotal(result.getCostTotal() + yesterdayMinuteDWD.getCostDay());
             result.setCompensationAmountTotal(result.getCompensationAmountTotal() + yesterdayMinuteDWD.getCompensationAmountDay());
             result.setCompensationAmountTotal(result.getCompensationAmountTotal() + yesterdayMinuteDWD.getCompensationAmountDay());
-            result.setViewCountTotal(result.getViewCountTotal() + yesterdayMinuteDWD.getCompensationAmountDay());
+            result.setViewCountTotal(result.getViewCountTotal() + yesterdayMinuteDWD.getViewCountDay());
             result.setValidClickCountTotal(result.getValidClickCountTotal() + yesterdayMinuteDWD.getValidClickCountDay());
             result.setValidClickCountTotal(result.getValidClickCountTotal() + yesterdayMinuteDWD.getValidClickCountDay());
             result.setValuableClickCountTotal(result.getValuableClickCountTotal() + yesterdayMinuteDWD.getValuableClickCountDay());
             result.setValuableClickCountTotal(result.getValuableClickCountTotal() + yesterdayMinuteDWD.getValuableClickCountDay());
             result.setConversionsCountTotal(result.getConversionsCountTotal() + yesterdayMinuteDWD.getConversionsCountDay());
             result.setConversionsCountTotal(result.getConversionsCountTotal() + yesterdayMinuteDWD.getConversionsCountDay());
@@ -1902,7 +1902,7 @@ public class PlanStatOfMinuteDWD {
         result.setCostDeviationRateTotal(result.getCostDeviationRateTotal() + result.getCostDeviationRateDay());
         result.setCostDeviationRateTotal(result.getCostDeviationRateTotal() + result.getCostDeviationRateDay());
         result.setCostTotal(result.getCostTotal() + result.getCostDay());
         result.setCostTotal(result.getCostTotal() + result.getCostDay());
         result.setCompensationAmountTotal(result.getCompensationAmountTotal() + result.getCompensationAmountDay());
         result.setCompensationAmountTotal(result.getCompensationAmountTotal() + result.getCompensationAmountDay());
-        result.setViewCountTotal(result.getViewCountTotal() + result.getCompensationAmountDay());
+        result.setViewCountTotal(result.getViewCountTotal() + result.getViewCountDay());
         result.setValidClickCountTotal(result.getValidClickCountTotal() + result.getValidClickCountDay());
         result.setValidClickCountTotal(result.getValidClickCountTotal() + result.getValidClickCountDay());
         result.setValuableClickCountTotal(result.getValuableClickCountTotal() + result.getValuableClickCountDay());
         result.setValuableClickCountTotal(result.getValuableClickCountTotal() + result.getValuableClickCountDay());
         result.setConversionsCountTotal(result.getConversionsCountTotal() + result.getConversionsCountDay());
         result.setConversionsCountTotal(result.getConversionsCountTotal() + result.getConversionsCountDay());

+ 10 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/pojo/properties/ApplicationProperties.java

@@ -6,4 +6,14 @@ public class ApplicationProperties {
     public static final String MAX_COMPUTE_ACCOUNT_ENDPOINT = "maxCompute.endpoint";
     public static final String MAX_COMPUTE_ACCOUNT_ENDPOINT = "maxCompute.endpoint";
     public static final String MAX_COMPUTE_ACCOUNT_PROJECT_NAME = "maxCompute.projectName";
     public static final String MAX_COMPUTE_ACCOUNT_PROJECT_NAME = "maxCompute.projectName";
     public static final String MAX_COMPUTE_ACCOUNT_TUNNEL_ENDPOINT = "maxCompute.tunnelEndpoint";
     public static final String MAX_COMPUTE_ACCOUNT_TUNNEL_ENDPOINT = "maxCompute.tunnelEndpoint";
+
+    public static final String CK_URL = "clickhouse.url";
+    public static final String CK_USERNAME = "clickhouse.username";
+    public static final String CK_PASSWORD = "clickhouse.password";
+    public static final String CK_DRIVER = "clickhouse.driverClassName";
+
+    public static final String OSS_ASSESS_KEY_ID = "oss.accessKeyId";
+    public static final String OSS_ASSESS_KEY_SECRET = "oss.accessKeySecret";
+    public static final String OSS_ENDPOINT = "oss.endpoint";
+    public static final String OSS_BUCKET = "oss.bucket";
 }
 }

+ 89 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdDayDWDRollMonthProcess.java

@@ -0,0 +1,89 @@
+package flink.zanxiangnet.ad.monitoring.process;
+
+import com.aliyun.odps.Instance;
+import com.aliyun.odps.Odps;
+import com.aliyun.odps.account.Account;
+import com.aliyun.odps.account.AliyunAccount;
+import com.aliyun.odps.data.Record;
+import com.aliyun.odps.task.SQLTask;
+import flink.zanxiangnet.ad.monitoring.maxcompute.MaxComputeLog;
+import flink.zanxiangnet.ad.monitoring.pojo.entity.AdDataOfDayODS;
+import flink.zanxiangnet.ad.monitoring.pojo.entity.PlanStatOfDayDWD;
+import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
+import flink.zanxiangnet.ad.monitoring.util.DateUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+import org.apache.flink.util.Collector;
+
+import java.time.LocalDate;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class AdDayDWDRollMonthProcess extends ProcessWindowFunction<AdDataOfDayODS, PlanStatOfDayDWD, Long, GlobalWindow> {
+    private Odps odps;
+    // 上次查询的时间
+    private ValueState<String> lastQueryDayState;
+    // 之前聚合的昨天的数据
+    private MapState<String, PlanStatOfDayDWD> historyReduceState;
+
+    @Override
+    public void open(Configuration conf) throws Exception {
+        Map<String, String> params = getRuntimeContext()
+                .getExecutionConfig()
+                .getGlobalJobParameters()
+                .toMap();
+        Account account = new AliyunAccount(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_ID),
+                params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_KEY));
+        odps = new Odps(account);
+        odps.getRestClient().setRetryLogger(new MaxComputeLog());
+        odps.setEndpoint(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_ENDPOINT));
+        odps.setDefaultProject(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_PROJECT_NAME));
+
+        lastQueryDayState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastQueryDayState", String.class));
+        historyReduceState = getRuntimeContext().getMapState(new MapStateDescriptor<>("historyReduceState", String.class, PlanStatOfDayDWD.class));
+    }
+
+    @Override
+    public void process(Long elementsCount, ProcessWindowFunction<AdDataOfDayODS, PlanStatOfDayDWD, Long, GlobalWindow>.Context context,
+                        Iterable<AdDataOfDayODS> iterable, Collector<PlanStatOfDayDWD> collector) throws Exception {
+        AdDataOfDayODS element = iterable.iterator().next();
+        LocalDate statDay = DateUtil.parseLocalDate(element.getStatDay());
+        long now = System.currentTimeMillis();
+
+        String lastQueryDay = lastQueryDayState.value();
+        // 从 maxCompute查找广告的历史数据
+        if (lastQueryDay == null || !lastQueryDay.equals(DateUtil.formatLocalDate(LocalDate.now()))) {
+            LocalDate endTime = LocalDate.now(), beginTime = statDay.minusDays(60);
+            String sql = "SELECT * FROM plan_stat_of_day_dwd WHERE stat_day >= \"" + DateUtil.formatLocalDate(beginTime) + "\" AND stat_day <= \"" + DateUtil.formatLocalDate(endTime) + "\" AND campaign_id = " + element.getCampaignId() + ";";
+            Instance instance = SQLTask.run(odps, sql);
+            // log.error("sql: " + sql + ", odps日志: " + odps.logview().generateLogView(instance, 7 * 24));
+            instance.waitForSuccess();
+            List<Record> records = SQLTask.getResult(instance);
+            Map<String, PlanStatOfDayDWD> historyData = records.stream()
+                    .map(PlanStatOfDayDWD::byMaxCompute)
+                    .sorted((o1, o2) -> new Long(o1.getCreateTime().getTime() - o2.getCreateTime().getTime()).intValue())
+                    .collect(Collectors.toMap(PlanStatOfDayDWD::getStatDay, data -> data, (val1, val2) -> val2));
+            historyReduceState.clear();
+            historyReduceState.putAll(historyData);
+            lastQueryDayState.update(DateUtil.formatLocalDate(LocalDate.now()));
+        }
+        PlanStatOfDayDWD lastReduceData = null;
+        for (int i = 1; i <= 60; i++) {
+            lastReduceData = historyReduceState.get(DateUtil.formatLocalDate(statDay.minusDays(i)));
+            if (lastReduceData != null) {
+                break;
+            }
+        }
+        PlanStatOfDayDWD newStatData = PlanStatOfDayDWD.reduce(lastReduceData, element, now);
+        historyReduceState.put(DateUtil.formatLocalDate(statDay), newStatData);
+        collector.collect(newStatData);
+    }
+}

+ 31 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdDayDWDRollYearProcess.java

@@ -0,0 +1,31 @@
+package flink.zanxiangnet.ad.monitoring.process;
+
+import flink.zanxiangnet.ad.monitoring.pojo.entity.AdDataOfDayODS;
+import flink.zanxiangnet.ad.monitoring.pojo.entity.PlanStatOfDayDWD;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+import org.apache.flink.util.Collector;
+
+@Slf4j
+public class AdDayDWDRollYearProcess extends ProcessWindowFunction<AdDataOfDayODS, PlanStatOfDayDWD, Long, GlobalWindow> {
+    
+    // 上次聚合的结果
+    private ValueState<PlanStatOfDayDWD> lastReduceState;
+
+    public void open(Configuration conf) {
+        lastReduceState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastReduceState", PlanStatOfDayDWD.class));
+    }
+
+    @Override
+    public void process(Long elementsCount, ProcessWindowFunction<AdDataOfDayODS, PlanStatOfDayDWD, Long, GlobalWindow>.Context context,
+                        Iterable<AdDataOfDayODS> elements, Collector<PlanStatOfDayDWD> out) throws Exception {
+        AdDataOfDayODS element = elements.iterator().next();
+        PlanStatOfDayDWD newStatDWD = PlanStatOfDayDWD.reduce(lastReduceState.value(), element, System.currentTimeMillis());
+        out.collect(newStatDWD);
+        lastReduceState.update(newStatDWD);
+    }
+}

+ 0 - 6
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostHourDayProcess.java

@@ -3,14 +3,10 @@ package flink.zanxiangnet.ad.monitoring.process;
 import flink.zanxiangnet.ad.monitoring.AdStatJob;
 import flink.zanxiangnet.ad.monitoring.AdStatJob;
 import flink.zanxiangnet.ad.monitoring.clickhouse.sink.ClickhouseUtil;
 import flink.zanxiangnet.ad.monitoring.clickhouse.sink.ClickhouseUtil;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfDayDWD;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfDayDWD;
-import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfHourDWD;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.CostHourDM;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.CostHourDM;
-import flink.zanxiangnet.ad.monitoring.util.DateUtil;
-import flink.zanxiangnet.ad.monitoring.util.JsonUtil;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Collector;
 
 
 import java.io.IOException;
 import java.io.IOException;
@@ -19,8 +15,6 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Statement;
 import java.text.SimpleDateFormat;
 import java.text.SimpleDateFormat;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.List;

+ 0 - 8
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostHourProcess.java

@@ -3,15 +3,10 @@ package flink.zanxiangnet.ad.monitoring.process;
 import flink.zanxiangnet.ad.monitoring.AdStatJob;
 import flink.zanxiangnet.ad.monitoring.AdStatJob;
 import flink.zanxiangnet.ad.monitoring.clickhouse.sink.ClickhouseUtil;
 import flink.zanxiangnet.ad.monitoring.clickhouse.sink.ClickhouseUtil;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfHourDWD;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfHourDWD;
-import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfMinuteDWD;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.CostHourDM;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.CostHourDM;
-import flink.zanxiangnet.ad.monitoring.pojo.entity.CostMinuterDM;
-import flink.zanxiangnet.ad.monitoring.util.DateUtil;
-import flink.zanxiangnet.ad.monitoring.util.JsonUtil;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Collector;
 
 
 import java.io.IOException;
 import java.io.IOException;
@@ -20,11 +15,8 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Statement;
 import java.text.SimpleDateFormat;
 import java.text.SimpleDateFormat;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.List;
 import java.util.List;
 import java.util.Properties;
 import java.util.Properties;
 
 

+ 0 - 3
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostMinuteHourProcess.java

@@ -2,10 +2,8 @@ package flink.zanxiangnet.ad.monitoring.process;
 
 
 import flink.zanxiangnet.ad.monitoring.AdStatJob;
 import flink.zanxiangnet.ad.monitoring.AdStatJob;
 import flink.zanxiangnet.ad.monitoring.clickhouse.sink.ClickhouseUtil;
 import flink.zanxiangnet.ad.monitoring.clickhouse.sink.ClickhouseUtil;
-import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfHourDWD;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfMinuteDWD;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfMinuteDWD;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.CostHourDM;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.CostHourDM;
-import flink.zanxiangnet.ad.monitoring.pojo.entity.CostMinuterDM;
 import flink.zanxiangnet.ad.monitoring.util.DateUtil;
 import flink.zanxiangnet.ad.monitoring.util.DateUtil;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
@@ -22,7 +20,6 @@ import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.List;
 import java.util.List;
 import java.util.Properties;
 import java.util.Properties;
 
 

+ 0 - 14
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostMinuteProcess.java

@@ -1,22 +1,9 @@
 package flink.zanxiangnet.ad.monitoring.process;
 package flink.zanxiangnet.ad.monitoring.process;
 
 
-import com.aliyun.odps.Instance;
-import com.aliyun.odps.Odps;
-import com.aliyun.odps.account.Account;
-import com.aliyun.odps.account.AliyunAccount;
-import com.aliyun.odps.data.Record;
-import com.aliyun.odps.task.SQLTask;
 import flink.zanxiangnet.ad.monitoring.AdStatJob;
 import flink.zanxiangnet.ad.monitoring.AdStatJob;
 import flink.zanxiangnet.ad.monitoring.clickhouse.sink.ClickhouseUtil;
 import flink.zanxiangnet.ad.monitoring.clickhouse.sink.ClickhouseUtil;
-import flink.zanxiangnet.ad.monitoring.maxcompute.MaxComputeLog;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.*;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.*;
-import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
 import flink.zanxiangnet.ad.monitoring.util.DateUtil;
 import flink.zanxiangnet.ad.monitoring.util.DateUtil;
-import flink.zanxiangnet.ad.monitoring.util.JsonUtil;
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -32,7 +19,6 @@ import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
 import java.time.format.DateTimeFormatter;
 import java.util.*;
 import java.util.*;
-import java.util.stream.Collectors;
 
 
 public class CostMinuteProcess extends ProcessWindowFunction<AdStatOfMinuteDWD, CostMinuterDM, Long, TimeWindow> {
 public class CostMinuteProcess extends ProcessWindowFunction<AdStatOfMinuteDWD, CostMinuterDM, Long, TimeWindow> {
     private static final DateTimeFormatter formatForLastReduceKey = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH:mm");
     private static final DateTimeFormatter formatForLastReduceKey = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH:mm");

+ 58 - 73
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/sink/ClickhouseBatchStreamSink.java

@@ -1,79 +1,72 @@
 package flink.zanxiangnet.ad.monitoring.sink;
 package flink.zanxiangnet.ad.monitoring.sink;
 
 
-import com.aliyun.odps.Odps;
-import com.aliyun.odps.PartitionSpec;
-import com.aliyun.odps.account.Account;
-import com.aliyun.odps.account.AliyunAccount;
-import com.aliyun.odps.data.Record;
 import com.aliyun.odps.tunnel.TableTunnel;
 import com.aliyun.odps.tunnel.TableTunnel;
 import com.aliyun.odps.tunnel.TunnelException;
 import com.aliyun.odps.tunnel.TunnelException;
-import flink.zanxiangnet.ad.monitoring.maxcompute.MaxComputeLog;
+import flink.zanxiangnet.ad.monitoring.config.ClickhouseDataSourceFactory;
+import flink.zanxiangnet.ad.monitoring.dao.mapper.AdStatOfDayDWDMapper;
 import flink.zanxiangnet.ad.monitoring.maxcompute.bean.BeanUtil;
 import flink.zanxiangnet.ad.monitoring.maxcompute.bean.BeanUtil;
-import flink.zanxiangnet.ad.monitoring.maxcompute.bean.annotation.MaxComputeTable;
+import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfDayDWD;
 import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
 import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
-import org.apache.commons.lang3.StringUtils;
+import flink.zanxiangnet.ad.monitoring.util.JsonUtil;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.ibatis.datasource.DataSourceFactory;
+import org.apache.ibatis.mapping.Environment;
+import org.apache.ibatis.session.SqlSession;
+import org.apache.ibatis.session.SqlSessionFactory;
+import org.apache.ibatis.session.SqlSessionFactoryBuilder;
+import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory;
+import org.springframework.beans.BeanUtils;
 import org.springframework.util.CollectionUtils;
 import org.springframework.util.CollectionUtils;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.lang.reflect.Method;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
+import java.util.*;
 
 
 /**
 /**
  * 批量数据写出
  * 批量数据写出
  *
  *
  * @param <IN>
  * @param <IN>
  */
  */
+@Slf4j
 public class ClickhouseBatchStreamSink<T, IN extends List<T>> extends RichSinkFunction<IN> {
 public class ClickhouseBatchStreamSink<T, IN extends List<T>> extends RichSinkFunction<IN> {
-    private static final Logger log = LoggerFactory.getLogger(ClickhouseBatchStreamSink.class);
 
 
     // 对象锁,防止MaxCompute的 Tunnel对象多次初始化
     // 对象锁,防止MaxCompute的 Tunnel对象多次初始化
     private static final Object DUMMY_LOCK = new Object();
     private static final Object DUMMY_LOCK = new Object();
 
 
-    private final Class<T> clazz;
     private String projectName;
     private String projectName;
     private String tableName;
     private String tableName;
 
 
     private volatile transient TableTunnel tunnel;
     private volatile transient TableTunnel tunnel;
     private volatile transient List<BeanUtil.FieldInfo> fieldInfoList;
     private volatile transient List<BeanUtil.FieldInfo> fieldInfoList;
     private volatile transient Map<String, Method> partitionFieldMethods;
     private volatile transient Map<String, Method> partitionFieldMethods;
+    private SqlSessionFactory sqlSessionFactory;
 
 
-    public ClickhouseBatchStreamSink(Class<T> clazz) {
-        this.clazz = clazz;
-    }
 
 
     @Override
     @Override
-    public void open(Configuration config) {
-        if (tunnel == null) {
-            synchronized (DUMMY_LOCK) {
-                if (tunnel == null) {
-                    Map<String, String> params = getRuntimeContext()
-                            .getExecutionConfig()
-                            .getGlobalJobParameters()
-                            .toMap();
-                    MaxComputeTable tableAnnotation = clazz.getAnnotation(MaxComputeTable.class);
+    public void open(Configuration config) throws Exception {
+        Map<String, String> params = getRuntimeContext()
+                .getExecutionConfig()
+                .getGlobalJobParameters()
+                .toMap();
 
 
-                    Account account = new AliyunAccount(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_ID),
-                            params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_KEY));
-                    Odps odps = new Odps(account);
-                    odps.getRestClient().setRetryLogger(new MaxComputeLog());
-                    odps.setEndpoint(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_ENDPOINT));
-                    odps.setDefaultProject(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_PROJECT_NAME));
-                    tunnel = new TableTunnel(odps);
-                    tunnel.setEndpoint(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_TUNNEL_ENDPOINT));
-                    projectName = params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_PROJECT_NAME);
-                    tableName = tableAnnotation.value();
-                    fieldInfoList = BeanUtil.parseBeanField(clazz);
-                    partitionFieldMethods = fieldInfoList.stream().filter(BeanUtil.FieldInfo::isUsePartitioned).collect(Collectors.toMap(BeanUtil.FieldInfo::getColumnName, BeanUtil.FieldInfo::getGetMethod));
-                }
-            }
-        }
+        Properties ckProps = new Properties();
+        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_URL, params.get(ApplicationProperties.CK_URL));
+        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_USER, params.get(ApplicationProperties.CK_USERNAME));
+        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_PASSWORD, params.get(ApplicationProperties.CK_PASSWORD));
+
+        DataSourceFactory dataSourceFactory = new ClickhouseDataSourceFactory();
+        dataSourceFactory.setProperties(ckProps);
+        Environment environment = new Environment("clickhouse", new JdbcTransactionFactory(), dataSourceFactory.getDataSource());
+        org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration(environment);
+        // 开启驼峰规则
+        configuration.setMapUnderscoreToCamelCase(true);
+        configuration.getTypeAliasRegistry().registerAlias(AdStatOfDayDWD.class);
+        // addMapper一定要放到 alias的后面!!!!!
+        configuration.addMapper(AdStatOfDayDWDMapper.class);
+        sqlSessionFactory = new SqlSessionFactoryBuilder().build(configuration);
     }
     }
 
 
     /**
     /**
@@ -84,37 +77,29 @@ public class ClickhouseBatchStreamSink<T, IN extends List<T>> extends RichSinkFu
      */
      */
     @Override
     @Override
     public void invoke(IN value, Context context) throws TunnelException, IOException, InvocationTargetException, IllegalAccessException {
     public void invoke(IN value, Context context) throws TunnelException, IOException, InvocationTargetException, IllegalAccessException {
-        T element = value.get(0);
-        String partitionStr = generatePartitionStr(element);
-        System.out.println("[" + tableName + "]写入数据量:" + value.size() + "写入分区:" + partitionStr);
-        TableTunnel.StreamUploadSession uploadSession = tunnel.createStreamUploadSession(projectName, tableName, StringUtils.isBlank(partitionStr) ? null : new PartitionSpec(partitionStr), true);
-        TableTunnel.StreamRecordPack pack = uploadSession.newRecordPack();
-        for (T t : value) {
-            Record record = uploadSession.newRecord();
-            for (BeanUtil.FieldInfo fieldInfo : fieldInfoList) {
-                if (fieldInfo.isUsePartitioned()) {
-                    // 分区字段不在这里设值
-                    continue;
-                }
-                Object obj = fieldInfo.getGetMethod().invoke(t);
-                record.set(fieldInfo.getColumnName(), obj);
-            }
-            // append只是写入内存
-            pack.append(record);
+        AdStatOfDayDWD adStatOfDayDWD = new AdStatOfDayDWD();
+        adStatOfDayDWD.setStatDay("2021-12-01");
+        adStatOfDayDWD.setAccountId(100L);
+        adStatOfDayDWD.setCreateTime(new Date());
+        adStatOfDayDWD.setCostDeviationRateTotal(39.5);
+        adStatOfDayDWD.removeNull();
+
+        List<AdStatOfDayDWD> list = new ArrayList<>(6);
+        for (int i = 0; i < 3; i ++) {
+            AdStatOfDayDWD temp = new AdStatOfDayDWD();
+            BeanUtils.copyProperties(adStatOfDayDWD, temp);
+            temp.setAdId((long) i);
+            list.add(temp);
+        }
+        list.add(adStatOfDayDWD);
+        log.error("准备写入:{}", value.size());
+        try (SqlSession session = sqlSessionFactory.openSession()) {
+            AdStatOfDayDWDMapper mapper = session.getMapper(AdStatOfDayDWDMapper.class);
+            mapper.add(adStatOfDayDWD);
+            mapper.addBatch(list);
+            List<AdStatOfDayDWD> all = mapper.selectAll();
+            log.error("查询结果:{} | {}", all.size(), JsonUtil.toString(list));
         }
         }
-        int retry = 0;
-        do {
-            try {
-                // 大概用时 100ms ~ 3s
-                pack.flush();
-                break;
-            } catch (IOException e) {
-                if (retry == 3) {
-                    log.error("Flush data error!msg: " + e.getMessage());
-                    throw e;
-                }
-            }
-        } while (retry++ < 3);
     }
     }
 
 
     @Override
     @Override
@@ -122,7 +107,7 @@ public class ClickhouseBatchStreamSink<T, IN extends List<T>> extends RichSinkFu
         super.close();
         super.close();
     }
     }
 
 
-    private String generatePartitionStr(T t) {
+    private String generateSQL(T t) {
         if (CollectionUtils.isEmpty(partitionFieldMethods)) {
         if (CollectionUtils.isEmpty(partitionFieldMethods)) {
             return null;
             return null;
         }
         }