Browse Source

天数据写入 mysql

wcc 3 năm trước cách đây
mục cha
commit
087c520fb9
19 tập tin đã thay đổi với 157 bổ sung82 xóa
  1. 7 0
      flink-ad-monitoring/pom.xml
  2. 0 1
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdDayStreamJob.java
  3. 6 4
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdHourStreamJob.java
  4. 1 1
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/PlanDayStreamJob.java
  5. 5 7
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/config/ClickhouseDataSourceFactory.java
  6. 47 0
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/config/MysqlDataSourceFactory.java
  7. 4 4
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/dao/mapper/AdStatOfDayDWDMapper.xml
  8. 10 3
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/pojo/properties/ApplicationProperties.java
  9. 11 8
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdDayDWDRollMonthProcess.java
  10. 11 8
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdDayOnTimeStreamCompletionProcess.java
  11. 12 8
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdHourDWDProcess.java
  12. 13 10
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdMinuteDWDProcess.java
  13. 3 3
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/PlanDayDWDRollMonthProcess.java
  14. 3 3
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/PlanHourDWDProcess.java
  15. 3 3
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/PlanMinuteDWDProcess.java
  16. 12 8
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/sink/AdDayDWDToCkBatchSink.java
  17. 3 5
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/sink/AdHourDMToCkBatchSink.java
  18. 3 3
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/sink/AdMinuteDMToCkBatchSink.java
  19. 3 3
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/sink/PlanDayDWDToCkBatchSink.java

+ 7 - 0
flink-ad-monitoring/pom.xml

@@ -175,6 +175,13 @@ under the License.
             <artifactId>commons-csv</artifactId>
             <version>1.9.0</version>
         </dependency>
+
+        <!-- mysql -->
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <version>8.0.27</version>
+        </dependency>
     </dependencies>
 
     <build>

+ 0 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdDayStreamJob.java

@@ -68,7 +68,6 @@ public class AdDayStreamJob {
         }
 
         KafkaSource<String> adStreamOfDaySource = KafkaComponent.buildKafkaSource(props, KafkaComponent.KafkaTopic.adDayTopic, KafkaComponent.KafkaTopic.KafkaGroupId.adDayConsumerGroup);
-
         DataStreamSource<String> adStreamOfDayIn = env.fromSource(adStreamOfDaySource, WatermarkStrategy.noWatermarks(), "adDaySource_kafka");
 
         // 广告日数据。往前回滚 10天

+ 6 - 4
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdHourStreamJob.java

@@ -72,7 +72,7 @@ public class AdHourStreamJob {
         }
 
         KafkaSource<String> adStreamOfMinuteSource = KafkaComponent.buildKafkaSource(props, KafkaComponent.KafkaTopic.adHourTopic, KafkaComponent.KafkaTopic.KafkaGroupId.adHourConsumerGroup);
-        DataStreamSource<String> adStreamOfMinuteIn = env.fromSource(adStreamOfMinuteSource, WatermarkStrategy.noWatermarks(), "adHourSource_kafka");
+        DataStreamSource<String> adStreamOfMinuteIn = env.fromSource(adStreamOfMinuteSource, WatermarkStrategy.noWatermarks(), "adHourSource_kafka").setParallelism(12);
 
         // 广告分钟数据(前 5分钟的广告消耗数据)
         final OutputTag<AdDataOfMinuteODS> adMinuteStreamTag = new OutputTag<AdDataOfMinuteODS>("adMinuteStream") {
@@ -82,8 +82,9 @@ public class AdHourStreamJob {
         };
 
         // 对流进行映射,拆分(实时的分钟流和回滚的小时流)
-        SingleOutputStreamOperator<AdDataOfMinuteODS> adODSStream = adStreamOfMinuteIn.filter(StringUtils::isNotBlank)
-                .process(new AdHourDTOStreamProcess(adMinuteStreamTag, adHourStreamTag));
+        SingleOutputStreamOperator<AdDataOfMinuteODS> adODSStream = adStreamOfMinuteIn
+                .filter(StringUtils::isNotBlank).setParallelism(12)
+                .process(new AdHourDTOStreamProcess(adMinuteStreamTag, adHourStreamTag)).setParallelism(12);
 
         // 分钟流
         DataStream<AdDataOfMinuteODS> adMinuteODSStream = adODSStream.getSideOutput(adMinuteStreamTag);
@@ -102,7 +103,8 @@ public class AdHourStreamJob {
                 // 开一个 5分钟的滚动窗口
                 .window(TumblingEventTimeWindows.of(Time.minutes(5L)))
                 .trigger(new AdMinuteODSStreamTrigger())
-                .process(new AdMinuteDWDProcess());
+                .process(new AdMinuteDWDProcess())
+                .setParallelism(12);
         new KeyedBatchStream<>("adMinuteDWDStream", adMinuteDWDStream.keyBy(AdStatOfMinuteDWD::getStatDay), 1000L, 60 * 1000L)
                 .toBatch()
                 .addSink(new TunnelBatchStreamSink<>(AdStatOfMinuteDWD.class))

+ 1 - 1
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/PlanDayStreamJob.java

@@ -65,8 +65,8 @@ public class PlanDayStreamJob {
         if (StringUtils.isNotBlank(props.getProperty(ApplicationProperties.CHECKPOINT_SAVEPOINT))) {
             env.getCheckpointConfig().setCheckpointStorage(props.getProperty(ApplicationProperties.CHECKPOINT_SAVEPOINT));
         }
-        KafkaSource<String> adStreamOfDaySource = KafkaComponent.buildKafkaSource(props, KafkaComponent.KafkaTopic.adDayTopic, KafkaComponent.KafkaTopic.KafkaGroupId.planDayConsumerGroup);
 
+        KafkaSource<String> adStreamOfDaySource = KafkaComponent.buildKafkaSource(props, KafkaComponent.KafkaTopic.adDayTopic, KafkaComponent.KafkaTopic.KafkaGroupId.planDayConsumerGroup);
         DataStreamSource<String> adStreamOfDayIn = env.fromSource(adStreamOfDaySource, WatermarkStrategy.noWatermarks(), "planDaySource_kafka");
 
         // 广告日数据。往前回滚 10天

+ 5 - 7
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/config/ClickhouseDataSourceFactory.java

@@ -2,6 +2,7 @@ package flink.zanxiangnet.ad.monitoring.config;
 
 import com.zaxxer.hikari.HikariConfig;
 import com.zaxxer.hikari.HikariDataSource;
+import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.ibatis.datasource.DataSourceFactory;
 import ru.yandex.clickhouse.BalancedClickhouseDataSource;
@@ -16,10 +17,6 @@ import java.util.Properties;
 @Slf4j
 public class ClickhouseDataSourceFactory implements DataSourceFactory {
 
-    public static final String PROP_URL = "url";
-    public static final String PROP_USER = "user";
-    public static final String PROP_PASSWORD = "password";
-
     private Properties props;
 
     @Override
@@ -30,9 +27,10 @@ public class ClickhouseDataSourceFactory implements DataSourceFactory {
     @Override
     public DataSource getDataSource() {
         ClickHouseProperties ckProps = new ClickHouseProperties();
-        ckProps.setUser(props.getProperty(PROP_USER));
-        ckProps.setPassword(props.getProperty(PROP_PASSWORD));
-        DataSource dataSource = new BalancedClickhouseDataSource(props.getProperty(PROP_URL), ckProps);
+        ckProps.setUser(props.getProperty(ApplicationProperties.CK_USERNAME));
+        ckProps.setPassword(props.getProperty(ApplicationProperties.CK_PASSWORD));
+        ckProps.setSocketTimeout(60 * 1000);
+        DataSource dataSource = new BalancedClickhouseDataSource(props.getProperty(ApplicationProperties.CK_URL), ckProps);
 
         HikariConfig config = new HikariConfig();
         // 此处还可以配置连接池大小,连接超时之类的

+ 47 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/config/MysqlDataSourceFactory.java

@@ -0,0 +1,47 @@
+package flink.zanxiangnet.ad.monitoring.config;
+
+import com.mysql.cj.jdbc.MysqlDataSource;
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.ibatis.datasource.DataSourceFactory;
+import ru.yandex.clickhouse.BalancedClickhouseDataSource;
+import ru.yandex.clickhouse.settings.ClickHouseProperties;
+
+import javax.sql.DataSource;
+import java.sql.SQLException;
+import java.util.Properties;
+
+/**
+ * Clickhouse连接池
+ */
+@Slf4j
+public class MysqlDataSourceFactory implements DataSourceFactory {
+
+    private Properties props;
+
+    @Override
+    public void setProperties(Properties props) {
+        this.props = props;
+    }
+
+    @Override
+    public DataSource getDataSource() {
+        MysqlDataSource dataSource = new MysqlDataSource();
+        dataSource.setUser(props.getProperty(ApplicationProperties.MYSQL_USERNAME));
+        dataSource.setPassword(props.getProperty(ApplicationProperties.MYSQL_PASSWORD));
+        dataSource.setUrl(props.getProperty(ApplicationProperties.MYSQL_URL));
+        try {
+            dataSource.setConnectTimeout(30000);
+            dataSource.setInitialTimeout(600000);
+            dataSource.setSocketTimeout(60 * 1000);
+        } catch (SQLException e) {
+            log.error(e.getMessage(), e);
+        }
+        HikariConfig config = new HikariConfig();
+        config.setDataSource(dataSource);
+        return new HikariDataSource(config);
+    }
+}

+ 4 - 4
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/dao/mapper/AdStatOfDayDWDMapper.xml

@@ -97,8 +97,8 @@
         order by stat_day desc, create_time desc limit ${queryCount}
     </select>
 
-    <insert id="add">
-        INSERT INTO ad_stat_of_day(<include refid="Base_Column_List"/>)
+    <insert id="add" useGeneratedKeys="false" keyColumn="stat_day,ad_id" keyProperty="item.statDay,item.adId">
+        REPLACE INTO ad_stat_of_day(<include refid="Base_Column_List"/>)
         VALUES
         ( #{item.statDay}, #{item.accountId},
         #{item.campaignId}, #{item.agencyAccountId},
@@ -179,8 +179,8 @@
         #{item.noInterestCountTotal}, #{item.noInterestCountDay})
     </insert>
 
-    <insert id="addBatch">
-        INSERT INTO ad_stat_of_day(<include refid="Base_Column_List"/>)
+    <insert id="addBatch" useGeneratedKeys="false" keyColumn="stat_day,ad_id" keyProperty="statDay,adId">
+        REPLACE INTO ad_stat_of_day(<include refid="Base_Column_List"/>)
         VALUES
         <foreach collection="list" index="index" item="item" separator=",">
             ( #{item.statDay}, #{item.accountId},

+ 10 - 3
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/pojo/properties/ApplicationProperties.java

@@ -7,15 +7,22 @@ public class ApplicationProperties {
     public static final String MAX_COMPUTE_ACCOUNT_PROJECT_NAME = "maxCompute.projectName";
     public static final String MAX_COMPUTE_ACCOUNT_TUNNEL_ENDPOINT = "maxCompute.tunnelEndpoint";
 
+    public static final String CHECKPOINT_SAVEPOINT = "checkpoint.savePath";
+
+    public static final String CK_DRIVER_CLASS_NAME = "clickhouse.driverClassName";
     public static final String CK_URL = "clickhouse.url";
     public static final String CK_USERNAME = "clickhouse.username";
     public static final String CK_PASSWORD = "clickhouse.password";
-    public static final String CK_DRIVER = "clickhouse.driverClassName";
+
+    public static final String MYSQL_DRIVER_CLASS_NAME = "mysql.driverClassName";
+    public static final String MYSQL_URL = "mysql.url";
+    public static final String MYSQL_USERNAME = "mysql.username";
+    public static final String MYSQL_PASSWORD = "mysql.password";
+    public static final String MYSQL_INIT_POOL_SIZE = "mysql.initPoolSize";
+    public static final String MYSQL_MAX_POOL_SIZE = "mysql.maxPoolSize";
 
     public static final String OSS_ASSESS_KEY_ID = "oss.accessKeyId";
     public static final String OSS_ASSESS_KEY_SECRET = "oss.accessKeySecret";
     public static final String OSS_ENDPOINT = "oss.endpoint";
     public static final String OSS_BUCKET = "oss.bucket";
-
-    public static final String CHECKPOINT_SAVEPOINT = "checkpoint.savePath";
 }

+ 11 - 8
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdDayDWDRollMonthProcess.java

@@ -1,6 +1,6 @@
 package flink.zanxiangnet.ad.monitoring.process;
 
-import flink.zanxiangnet.ad.monitoring.config.ClickhouseDataSourceFactory;
+import flink.zanxiangnet.ad.monitoring.config.MysqlDataSourceFactory;
 import flink.zanxiangnet.ad.monitoring.dao.mapper.AdStatOfDayDWDMapper;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.AdDataOfDayODS;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfDayDWD;
@@ -43,14 +43,17 @@ public class AdDayDWDRollMonthProcess extends KeyedProcessFunction<Long, AdDataO
                 .getGlobalJobParameters()
                 .toMap();
 
-        Properties ckProps = new Properties();
-        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_URL, params.get(ApplicationProperties.CK_URL));
-        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_USER, params.get(ApplicationProperties.CK_USERNAME));
-        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_PASSWORD, params.get(ApplicationProperties.CK_PASSWORD));
+        Properties mysqlProps = new Properties();
+        mysqlProps.setProperty(ApplicationProperties.MYSQL_DRIVER_CLASS_NAME, params.get(ApplicationProperties.MYSQL_DRIVER_CLASS_NAME));
+        mysqlProps.setProperty(ApplicationProperties.MYSQL_URL, params.get(ApplicationProperties.MYSQL_URL));
+        mysqlProps.setProperty(ApplicationProperties.MYSQL_USERNAME, params.get(ApplicationProperties.MYSQL_USERNAME));
+        mysqlProps.setProperty(ApplicationProperties.MYSQL_PASSWORD, params.get(ApplicationProperties.MYSQL_PASSWORD));
+        mysqlProps.setProperty(ApplicationProperties.MYSQL_INIT_POOL_SIZE, params.get(ApplicationProperties.MYSQL_INIT_POOL_SIZE));
+        mysqlProps.setProperty(ApplicationProperties.MYSQL_MAX_POOL_SIZE, params.get(ApplicationProperties.MYSQL_MAX_POOL_SIZE));
 
-        DataSourceFactory dataSourceFactory = new ClickhouseDataSourceFactory();
-        dataSourceFactory.setProperties(ckProps);
-        Environment environment = new Environment("clickhouse", new JdbcTransactionFactory(), dataSourceFactory.getDataSource());
+        DataSourceFactory dataSourceFactory = new MysqlDataSourceFactory();
+        dataSourceFactory.setProperties(mysqlProps);
+        Environment environment = new Environment("mysql", new JdbcTransactionFactory(), dataSourceFactory.getDataSource());
         org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration(environment);
         // 开启驼峰规则
         configuration.setMapUnderscoreToCamelCase(true);

+ 11 - 8
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdDayOnTimeStreamCompletionProcess.java

@@ -1,6 +1,6 @@
 package flink.zanxiangnet.ad.monitoring.process;
 
-import flink.zanxiangnet.ad.monitoring.config.ClickhouseDataSourceFactory;
+import flink.zanxiangnet.ad.monitoring.config.MysqlDataSourceFactory;
 import flink.zanxiangnet.ad.monitoring.dao.mapper.AdStatOfDayDWDMapper;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfDayDWD;
 import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
@@ -40,14 +40,17 @@ public class AdDayOnTimeStreamCompletionProcess extends KeyedProcessFunction<Lon
                 .getGlobalJobParameters()
                 .toMap();
 
-        Properties ckProps = new Properties();
-        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_URL, params.get(ApplicationProperties.CK_URL));
-        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_USER, params.get(ApplicationProperties.CK_USERNAME));
-        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_PASSWORD, params.get(ApplicationProperties.CK_PASSWORD));
+        Properties mysqlProps = new Properties();
+        mysqlProps.setProperty(ApplicationProperties.MYSQL_DRIVER_CLASS_NAME, params.get(ApplicationProperties.MYSQL_DRIVER_CLASS_NAME));
+        mysqlProps.setProperty(ApplicationProperties.MYSQL_URL, params.get(ApplicationProperties.MYSQL_URL));
+        mysqlProps.setProperty(ApplicationProperties.MYSQL_USERNAME, params.get(ApplicationProperties.MYSQL_USERNAME));
+        mysqlProps.setProperty(ApplicationProperties.MYSQL_PASSWORD, params.get(ApplicationProperties.MYSQL_PASSWORD));
+        mysqlProps.setProperty(ApplicationProperties.MYSQL_INIT_POOL_SIZE, params.get(ApplicationProperties.MYSQL_INIT_POOL_SIZE));
+        mysqlProps.setProperty(ApplicationProperties.MYSQL_MAX_POOL_SIZE, params.get(ApplicationProperties.MYSQL_MAX_POOL_SIZE));
 
-        DataSourceFactory dataSourceFactory = new ClickhouseDataSourceFactory();
-        dataSourceFactory.setProperties(ckProps);
-        Environment environment = new Environment("clickhouse", new JdbcTransactionFactory(), dataSourceFactory.getDataSource());
+        DataSourceFactory dataSourceFactory = new MysqlDataSourceFactory();
+        dataSourceFactory.setProperties(mysqlProps);
+        Environment environment = new Environment("mysql", new JdbcTransactionFactory(), dataSourceFactory.getDataSource());
         org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration(environment);
         // 开启驼峰规则
         configuration.setMapUnderscoreToCamelCase(true);

+ 12 - 8
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdHourDWDProcess.java

@@ -1,6 +1,6 @@
 package flink.zanxiangnet.ad.monitoring.process;
 
-import flink.zanxiangnet.ad.monitoring.config.ClickhouseDataSourceFactory;
+import flink.zanxiangnet.ad.monitoring.config.MysqlDataSourceFactory;
 import flink.zanxiangnet.ad.monitoring.dao.mapper.AdStatOfDayDWDMapper;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.AdDataOfHourODS;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfDayDWD;
@@ -43,14 +43,18 @@ public class AdHourDWDProcess extends KeyedProcessFunction<Long, AdDataOfHourODS
                 .getGlobalJobParameters()
                 .toMap();
 
-        Properties ckProps = new Properties();
-        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_URL, params.get(ApplicationProperties.CK_URL));
-        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_USER, params.get(ApplicationProperties.CK_USERNAME));
-        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_PASSWORD, params.get(ApplicationProperties.CK_PASSWORD));
 
-        DataSourceFactory dataSourceFactory = new ClickhouseDataSourceFactory();
-        dataSourceFactory.setProperties(ckProps);
-        Environment environment = new Environment("clickhouse", new JdbcTransactionFactory(), dataSourceFactory.getDataSource());
+        Properties mysqlProps = new Properties();
+        mysqlProps.setProperty(ApplicationProperties.MYSQL_DRIVER_CLASS_NAME, params.get(ApplicationProperties.MYSQL_DRIVER_CLASS_NAME));
+        mysqlProps.setProperty(ApplicationProperties.MYSQL_URL, params.get(ApplicationProperties.MYSQL_URL));
+        mysqlProps.setProperty(ApplicationProperties.MYSQL_USERNAME, params.get(ApplicationProperties.MYSQL_USERNAME));
+        mysqlProps.setProperty(ApplicationProperties.MYSQL_PASSWORD, params.get(ApplicationProperties.MYSQL_PASSWORD));
+        mysqlProps.setProperty(ApplicationProperties.MYSQL_INIT_POOL_SIZE, params.get(ApplicationProperties.MYSQL_INIT_POOL_SIZE));
+        mysqlProps.setProperty(ApplicationProperties.MYSQL_MAX_POOL_SIZE, params.get(ApplicationProperties.MYSQL_MAX_POOL_SIZE));
+
+        DataSourceFactory dataSourceFactory = new MysqlDataSourceFactory();
+        dataSourceFactory.setProperties(mysqlProps);
+        Environment environment = new Environment("mysql", new JdbcTransactionFactory(), dataSourceFactory.getDataSource());
         org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration(environment);
         // 开启驼峰规则
         configuration.setMapUnderscoreToCamelCase(true);

+ 13 - 10
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/AdMinuteDWDProcess.java

@@ -1,6 +1,6 @@
 package flink.zanxiangnet.ad.monitoring.process;
 
-import flink.zanxiangnet.ad.monitoring.config.ClickhouseDataSourceFactory;
+import flink.zanxiangnet.ad.monitoring.config.MysqlDataSourceFactory;
 import flink.zanxiangnet.ad.monitoring.dao.mapper.AdStatOfDayDWDMapper;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.*;
 import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
@@ -49,14 +49,17 @@ public class AdMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteODS,
                 .getGlobalJobParameters()
                 .toMap();
 
-        Properties ckProps = new Properties();
-        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_URL, params.get(ApplicationProperties.CK_URL));
-        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_USER, params.get(ApplicationProperties.CK_USERNAME));
-        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_PASSWORD, params.get(ApplicationProperties.CK_PASSWORD));
-
-        DataSourceFactory dataSourceFactory = new ClickhouseDataSourceFactory();
-        dataSourceFactory.setProperties(ckProps);
-        Environment environment = new Environment("clickhouse", new JdbcTransactionFactory(), dataSourceFactory.getDataSource());
+        Properties mysqlProps = new Properties();
+        mysqlProps.setProperty(ApplicationProperties.MYSQL_DRIVER_CLASS_NAME, params.get(ApplicationProperties.MYSQL_DRIVER_CLASS_NAME));
+        mysqlProps.setProperty(ApplicationProperties.MYSQL_URL, params.get(ApplicationProperties.MYSQL_URL));
+        mysqlProps.setProperty(ApplicationProperties.MYSQL_USERNAME, params.get(ApplicationProperties.MYSQL_USERNAME));
+        mysqlProps.setProperty(ApplicationProperties.MYSQL_PASSWORD, params.get(ApplicationProperties.MYSQL_PASSWORD));
+        mysqlProps.setProperty(ApplicationProperties.MYSQL_INIT_POOL_SIZE, params.get(ApplicationProperties.MYSQL_INIT_POOL_SIZE));
+        mysqlProps.setProperty(ApplicationProperties.MYSQL_MAX_POOL_SIZE, params.get(ApplicationProperties.MYSQL_MAX_POOL_SIZE));
+
+        DataSourceFactory dataSourceFactory = new MysqlDataSourceFactory();
+        dataSourceFactory.setProperties(mysqlProps);
+        Environment environment = new Environment("mysql", new JdbcTransactionFactory(), dataSourceFactory.getDataSource());
         org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration(environment);
         // 开启驼峰规则
         configuration.setMapUnderscoreToCamelCase(true);
@@ -145,7 +148,7 @@ public class AdMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteODS,
                     LocalDateTime hourStatTime = LocalDateTime.of(beginDate, LocalTime.of(hour, 0, 0));
                     // 找今天聚合的最后一条历史数据
                     AdStatOfMinuteDWD temp = null;
-                    for (int i = hour - 1; i >= 0; i++) {
+                    for (int i = hour - 1; i >= 0; i--) {
                         temp = lastReduceState.get(LocalDateTime.of(beginDate, LocalTime.of(i, 55, 0)).format(formatForLastReduceKey));
                         if (temp != null) {
                             break;

+ 3 - 3
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/PlanDayDWDRollMonthProcess.java

@@ -45,9 +45,9 @@ public class PlanDayDWDRollMonthProcess extends KeyedProcessFunction<Long, AdDat
                 .toMap();
 
         Properties ckProps = new Properties();
-        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_URL, params.get(ApplicationProperties.CK_URL));
-        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_USER, params.get(ApplicationProperties.CK_USERNAME));
-        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_PASSWORD, params.get(ApplicationProperties.CK_PASSWORD));
+        ckProps.setProperty(ApplicationProperties.CK_URL, params.get(ApplicationProperties.CK_URL));
+        ckProps.setProperty(ApplicationProperties.CK_USERNAME, params.get(ApplicationProperties.CK_USERNAME));
+        ckProps.setProperty(ApplicationProperties.CK_PASSWORD, params.get(ApplicationProperties.CK_PASSWORD));
 
         DataSourceFactory dataSourceFactory = new ClickhouseDataSourceFactory();
         dataSourceFactory.setProperties(ckProps);

+ 3 - 3
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/PlanHourDWDProcess.java

@@ -50,9 +50,9 @@ public class PlanHourDWDProcess extends KeyedProcessFunction<Long, AdDataOfHourO
                 .toMap();
 
         Properties ckProps = new Properties();
-        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_URL, params.get(ApplicationProperties.CK_URL));
-        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_USER, params.get(ApplicationProperties.CK_USERNAME));
-        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_PASSWORD, params.get(ApplicationProperties.CK_PASSWORD));
+        ckProps.setProperty(ApplicationProperties.CK_URL, params.get(ApplicationProperties.CK_URL));
+        ckProps.setProperty(ApplicationProperties.CK_USERNAME, params.get(ApplicationProperties.CK_USERNAME));
+        ckProps.setProperty(ApplicationProperties.CK_PASSWORD, params.get(ApplicationProperties.CK_PASSWORD));
 
         DataSourceFactory dataSourceFactory = new ClickhouseDataSourceFactory();
         dataSourceFactory.setProperties(ckProps);

+ 3 - 3
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/PlanMinuteDWDProcess.java

@@ -58,9 +58,9 @@ public class PlanMinuteDWDProcess extends ProcessWindowFunction<AdDataOfMinuteOD
                 .toMap();
 
         Properties ckProps = new Properties();
-        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_URL, params.get(ApplicationProperties.CK_URL));
-        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_USER, params.get(ApplicationProperties.CK_USERNAME));
-        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_PASSWORD, params.get(ApplicationProperties.CK_PASSWORD));
+        ckProps.setProperty(ApplicationProperties.CK_URL, params.get(ApplicationProperties.CK_URL));
+        ckProps.setProperty(ApplicationProperties.CK_USERNAME, params.get(ApplicationProperties.CK_USERNAME));
+        ckProps.setProperty(ApplicationProperties.CK_PASSWORD, params.get(ApplicationProperties.CK_PASSWORD));
 
         DataSourceFactory dataSourceFactory = new ClickhouseDataSourceFactory();
         dataSourceFactory.setProperties(ckProps);

+ 12 - 8
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/sink/AdDayDWDToCkBatchSink.java

@@ -1,7 +1,7 @@
 package flink.zanxiangnet.ad.monitoring.sink;
 
 import com.aliyun.odps.tunnel.TunnelException;
-import flink.zanxiangnet.ad.monitoring.config.ClickhouseDataSourceFactory;
+import flink.zanxiangnet.ad.monitoring.config.MysqlDataSourceFactory;
 import flink.zanxiangnet.ad.monitoring.dao.mapper.AdStatOfDayDWDMapper;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfDayDWD;
 import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
@@ -35,14 +35,17 @@ public class AdDayDWDToCkBatchSink extends RichSinkFunction<List<AdStatOfDayDWD>
                 .getGlobalJobParameters()
                 .toMap();
 
-        Properties ckProps = new Properties();
-        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_URL, params.get(ApplicationProperties.CK_URL));
-        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_USER, params.get(ApplicationProperties.CK_USERNAME));
-        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_PASSWORD, params.get(ApplicationProperties.CK_PASSWORD));
+        Properties mysqlProps = new Properties();
+        mysqlProps.setProperty(ApplicationProperties.MYSQL_DRIVER_CLASS_NAME, params.get(ApplicationProperties.MYSQL_DRIVER_CLASS_NAME));
+        mysqlProps.setProperty(ApplicationProperties.MYSQL_URL, params.get(ApplicationProperties.MYSQL_URL));
+        mysqlProps.setProperty(ApplicationProperties.MYSQL_USERNAME, params.get(ApplicationProperties.MYSQL_USERNAME));
+        mysqlProps.setProperty(ApplicationProperties.MYSQL_PASSWORD, params.get(ApplicationProperties.MYSQL_PASSWORD));
+        mysqlProps.setProperty(ApplicationProperties.MYSQL_INIT_POOL_SIZE, params.get(ApplicationProperties.MYSQL_INIT_POOL_SIZE));
+        mysqlProps.setProperty(ApplicationProperties.MYSQL_MAX_POOL_SIZE, params.get(ApplicationProperties.MYSQL_MAX_POOL_SIZE));
 
-        DataSourceFactory dataSourceFactory = new ClickhouseDataSourceFactory();
-        dataSourceFactory.setProperties(ckProps);
-        Environment environment = new Environment("clickhouse", new JdbcTransactionFactory(), dataSourceFactory.getDataSource());
+        DataSourceFactory dataSourceFactory = new MysqlDataSourceFactory();
+        dataSourceFactory.setProperties(mysqlProps);
+        Environment environment = new Environment("mysql", new JdbcTransactionFactory(), dataSourceFactory.getDataSource());
         org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration(environment);
         // 开启驼峰规则
         configuration.setMapUnderscoreToCamelCase(true);
@@ -63,6 +66,7 @@ public class AdDayDWDToCkBatchSink extends RichSinkFunction<List<AdStatOfDayDWD>
         try (SqlSession session = sqlSessionFactory.openSession()) {
             AdStatOfDayDWDMapper mapper = session.getMapper(AdStatOfDayDWDMapper.class);
             mapper.addBatch(value);
+            session.commit();
         }
     }
 

+ 3 - 5
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/sink/AdHourDMToCkBatchSink.java

@@ -2,9 +2,7 @@ package flink.zanxiangnet.ad.monitoring.sink;
 
 import com.aliyun.odps.tunnel.TunnelException;
 import flink.zanxiangnet.ad.monitoring.config.ClickhouseDataSourceFactory;
-import flink.zanxiangnet.ad.monitoring.dao.mapper.AdStatOfDayDWDMapper;
 import flink.zanxiangnet.ad.monitoring.dao.mapper.CostHourDMMapper;
-import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfDayDWD;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.CostHourDM;
 import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
 import lombok.extern.slf4j.Slf4j;
@@ -40,9 +38,9 @@ public class AdHourDMToCkBatchSink extends RichSinkFunction<List<CostHourDM>> {
                 .toMap();
 
         Properties ckProps = new Properties();
-        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_URL, params.get(ApplicationProperties.CK_URL));
-        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_USER, params.get(ApplicationProperties.CK_USERNAME));
-        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_PASSWORD, params.get(ApplicationProperties.CK_PASSWORD));
+        ckProps.setProperty(ApplicationProperties.CK_URL, params.get(ApplicationProperties.CK_URL));
+        ckProps.setProperty(ApplicationProperties.CK_USERNAME, params.get(ApplicationProperties.CK_USERNAME));
+        ckProps.setProperty(ApplicationProperties.CK_PASSWORD, params.get(ApplicationProperties.CK_PASSWORD));
 
         DataSourceFactory dataSourceFactory = new ClickhouseDataSourceFactory();
         dataSourceFactory.setProperties(ckProps);

+ 3 - 3
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/sink/AdMinuteDMToCkBatchSink.java

@@ -38,9 +38,9 @@ public class AdMinuteDMToCkBatchSink extends RichSinkFunction<List<CostMinuterDM
                 .toMap();
 
         Properties ckProps = new Properties();
-        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_URL, params.get(ApplicationProperties.CK_URL));
-        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_USER, params.get(ApplicationProperties.CK_USERNAME));
-        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_PASSWORD, params.get(ApplicationProperties.CK_PASSWORD));
+        ckProps.setProperty(ApplicationProperties.CK_URL, params.get(ApplicationProperties.CK_URL));
+        ckProps.setProperty(ApplicationProperties.CK_USERNAME, params.get(ApplicationProperties.CK_USERNAME));
+        ckProps.setProperty(ApplicationProperties.CK_PASSWORD, params.get(ApplicationProperties.CK_PASSWORD));
 
         DataSourceFactory dataSourceFactory = new ClickhouseDataSourceFactory();
         dataSourceFactory.setProperties(ckProps);

+ 3 - 3
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/sink/PlanDayDWDToCkBatchSink.java

@@ -38,9 +38,9 @@ public class PlanDayDWDToCkBatchSink extends RichSinkFunction<List<PlanStatOfDay
                 .toMap();
 
         Properties ckProps = new Properties();
-        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_URL, params.get(ApplicationProperties.CK_URL));
-        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_USER, params.get(ApplicationProperties.CK_USERNAME));
-        ckProps.setProperty(ClickhouseDataSourceFactory.PROP_PASSWORD, params.get(ApplicationProperties.CK_PASSWORD));
+        ckProps.setProperty(ApplicationProperties.CK_URL, params.get(ApplicationProperties.CK_URL));
+        ckProps.setProperty(ApplicationProperties.CK_USERNAME, params.get(ApplicationProperties.CK_USERNAME));
+        ckProps.setProperty(ApplicationProperties.CK_PASSWORD, params.get(ApplicationProperties.CK_PASSWORD));
 
         DataSourceFactory dataSourceFactory = new ClickhouseDataSourceFactory();
         dataSourceFactory.setProperties(ckProps);