Sfoglia il codice sorgente

MOD:添加服务商id字段

cxyu 3 anni fa
parent
commit
eaa69f00b9

+ 6 - 6
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/clickhouse/sink/BatchSinkHour.java

@@ -28,8 +28,7 @@ public class BatchSinkHour extends RichSinkFunction<CostHourDM> {
         super.open(config);
         Properties props = new Properties();
         props.load(AdStatJob.class.getResourceAsStream("/application.properties"));
-        connection = ClickhouseUtil.getConn(props, "cc-bp11803zbt0oq045io.ads.rds.aliyuncs.com",
-                "8123", "data_monitoring");
+        connection = ClickhouseUtil.getConn(props);
     }
 
     @Override
@@ -121,7 +120,7 @@ public class BatchSinkHour extends RichSinkFunction<CostHourDM> {
                 "web_register_uv_hour,\n" +
                 "web_register_cost_total,\n" +
                 "web_register_cost_day,\n" +
-                "web_register_cost_hour) values " +
+                "web_register_cost_hour,agency_account_id) values " +
                 "(?,?,?,?,?,?,?,?,?,?,\n" +
                 "?,?,?,?,?,?,?,?,?,?,\n" +
                 "?,?,?,?,?,?,?,?,?,?,\n" +
@@ -130,7 +129,7 @@ public class BatchSinkHour extends RichSinkFunction<CostHourDM> {
                 "?,?,?,?,?,?,?,?,?,?,\n" +
                 "?,?,?,?,?,?,?,?,?,?,\n" +
                 "?,?,?,?,?,?,?,?,?,?,\n" +
-                "?,?,?,?,?,?,?)";
+                "?,?,?,?,?,?,?,?)";
         log.error(costhour.toString());
         PreparedStatement preparedStatement = connection.prepareStatement(sql);
         preparedStatement.setString(1, costhour.dt);
@@ -220,12 +219,13 @@ public class BatchSinkHour extends RichSinkFunction<CostHourDM> {
         preparedStatement.setDouble(85, costhour.webRegisterCostTotal);
         preparedStatement.setDouble(86, costhour.webRegisterCostDay);
         preparedStatement.setDouble(87, costhour.webRegisterCostHour);
+        preparedStatement.setString(88, costhour.agencyAccountId);
         preparedStatement.addBatch();
         long startTime = System.currentTimeMillis();
         int ints[] = preparedStatement.executeBatch();
         connection.commit();
         long endTime = System.currentTimeMillis();
-        log.error("批量插入耗时: " + (endTime - startTime) + "插入数据数量=" + ints.length);
+        log.info("批量插入耗时: " + (endTime - startTime) + "插入数据数量=" + ints.length);
         //clickhouse 处理重复数据
         //TODO:数据去重有问题,去除掉非最新的数据
 //        Statement statement_duplicate = connection.createStatement();
@@ -233,7 +233,7 @@ public class BatchSinkHour extends RichSinkFunction<CostHourDM> {
 //        statement_duplicate.executeQuery(sql_duplicate);
 //        connection.commit();
         long endTime_dp = System.currentTimeMillis();
-        log.error("数据清理耗时: " + (endTime_dp - endTime));
+        log.info("数据清理耗时: " + (endTime_dp - endTime));
 
     }
 

+ 6 - 11
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/clickhouse/sink/BatchSinkMinute.java

@@ -29,8 +29,7 @@ public class BatchSinkMinute extends RichSinkFunction<CostMinuterDM> {
         super.open(config);
         Properties props = new Properties();
         props.load(AdStatJob.class.getResourceAsStream("/application.properties"));
-        connection = ClickhouseUtil.getConn(props, "cc-bp11803zbt0oq045io.ads.rds.aliyuncs.com",
-                "8123", "data_monitoring");
+        connection = ClickhouseUtil.getConn(props);
     }
 
     @Override
@@ -75,11 +74,11 @@ public class BatchSinkMinute extends RichSinkFunction<CostMinuterDM> {
                 "from_follow_rate_minute,\n" +
                 "web_register_count_minute,\n" +
                 "web_register_uv_minute,\n" +
-                "web_register_cost_minute) values " +
+                "web_register_cost_minute,agency_account_id) values " +
                 "(?,?,?,?,?,?,?,?,?,?,\n" +
                 "?,?,?,?,?,?,?,?,?,?,\n" +
                 "?,?,?,?,?,?,?,?,?,?,\n" +
-                "?,?,?,?,?,?,?,?,?,?,\n)";
+                "?,?,?,?,?,?,?,?,?,?,?\n)";
 //        log.error(costMinuterDM);
         PreparedStatement preparedStatement = connection.prepareStatement(sql);
         preparedStatement.setString(1, costMinuterDM.dt);
@@ -122,20 +121,16 @@ public class BatchSinkMinute extends RichSinkFunction<CostMinuterDM> {
         preparedStatement.setLong(38, costMinuterDM.webRegisterCountMinute);
         preparedStatement.setLong(39, costMinuterDM.webRegisterUvMinute);
         preparedStatement.setDouble(40, costMinuterDM.webRegisterCostMinute);
+        preparedStatement.setString(41, costMinuterDM.agencyAccountId);
 
         preparedStatement.addBatch();
         long startTime = System.currentTimeMillis();
         int ints[] = preparedStatement.executeBatch();
         connection.commit();
         long endTime = System.currentTimeMillis();
-        log.error("批量插入耗时: " + (endTime - startTime) + "插入数据数量=" + ints.length);
-        //TODO:数据去重有问题,去除掉非最新的数据
-//        Statement statement_duplicate = connection.createStatement();
-//        String sql_duplicate = "optimize table data_monitoring.cost_minute final;";
-//        statement_duplicate.executeQuery(sql_duplicate);
-//        connection.commit();
+        log.info("批量插入耗时: " + (endTime - startTime) + "插入数据数量=" + ints.length);
         long endTime_dp = System.currentTimeMillis();
-        log.error("数据清理耗时: " + (endTime_dp - endTime));
+        log.info("数据清理耗时: " + (endTime_dp - endTime));
 
     }
 

+ 3 - 3
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/clickhouse/sink/ClickhouseUtil.java

@@ -14,17 +14,17 @@ public class ClickhouseUtil {
 //        public static
 //    }
 
-    public static Connection getConn(Properties props,String host, String port, String database) throws ClassNotFoundException, SQLException {
+    public static Connection getConn(Properties props) throws ClassNotFoundException, SQLException {
 
         String clickhouseUrl = props.getProperty("clickhouse.url");
         String clickhouseUser = props.getProperty("clickhouse.username");
         String clickhousePassword = props.getProperty("clickhouse.password");
 
-        Class.forName(clickhouseUrl);
+        Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
         String user = clickhouseUser;
         String password = clickhousePassword;
 
-        String address = "jdbc:clickhouse://" + host + ":" + port + "/" + database;
+        String address = clickhouseUrl;
         connection = DriverManager.getConnection(address, user, password);
         return connection;
     }

+ 2 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/pojo/entity/CostHourDM.java

@@ -14,6 +14,8 @@ public class CostHourDM {
     public String hour;
     //广告id
     public String adId;
+    //服务商id
+    public String agencyAccountId;
     //广告组id
     public String adgroupId;
     //创意id

+ 2 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/pojo/entity/CostMinuterDM.java

@@ -18,6 +18,8 @@ public class CostMinuterDM {
     public String createtime;
     //广告id
     public String adId;
+    //服务商id
+    public String agencyAccountId;
     //广告组id
     public String adgroupId;
     //创意id

+ 1 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostHourDayProcess.java

@@ -29,8 +29,7 @@ public class CostHourDayProcess extends ProcessWindowFunction<AdStatOfDayDWD, Co
     public void open(Configuration conf) throws SQLException, ClassNotFoundException, IOException {
         Properties props = new Properties();
         props.load(AdStatJob.class.getResourceAsStream("/application.properties"));
-        connection = ClickhouseUtil.getConn(props, "cc-bp11803zbt0oq045io.ads.rds.aliyuncs.com",
-                "8123", "data_monitoring");
+        connection = ClickhouseUtil.getConn(props);
     }
 
     //数据格式转换

+ 3 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostHourProcess.java

@@ -29,8 +29,7 @@ public class CostHourProcess extends ProcessWindowFunction<AdStatOfHourDWD, Cost
     public void open(Configuration conf) throws SQLException, ClassNotFoundException, IOException {
         Properties props = new Properties();
         props.load(AdStatJob.class.getResourceAsStream("/application.properties"));
-        connection = ClickhouseUtil.getConn(props, "cc-bp11803zbt0oq045io.ads.rds.aliyuncs.com",
-                "8123", "data_monitoring");
+        connection = ClickhouseUtil.getConn(props);
     }
 
     //数据格式转换
@@ -47,6 +46,8 @@ public class CostHourProcess extends ProcessWindowFunction<AdStatOfHourDWD, Cost
         costHourDM.hour = adStatOfMinuteDWD.getStatDay() + " " + tmpHour + ":00:00";
         //广告id
         costHourDM.adId = adStatOfMinuteDWD.getAdId().toString();
+        //服务商id
+        costHourDM.agencyAccountId = adStatOfMinuteDWD.getAgencyAccountId().toString();
         //广告组id
         costHourDM.adgroupId = adStatOfMinuteDWD.getAdgroupId().toString();
         //创意id

+ 1 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostMinuteHourProcess.java

@@ -32,8 +32,7 @@ public class CostMinuteHourProcess extends ProcessWindowFunction<AdStatOfMinuteD
     public void open(Configuration conf) throws SQLException, ClassNotFoundException, IOException {
         Properties props = new Properties();
         props.load(AdStatJob.class.getResourceAsStream("/application.properties"));
-        connection = ClickhouseUtil.getConn(props, "cc-bp11803zbt0oq045io.ads.rds.aliyuncs.com",
-                "8123", "data_monitoring");
+        connection = ClickhouseUtil.getConn(props);
     }
 
     //数据格式转换

+ 3 - 2
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostMinuteProcess.java

@@ -29,8 +29,7 @@ public class CostMinuteProcess extends ProcessWindowFunction<AdStatOfMinuteDWD,
     public void open(Configuration conf) throws SQLException, ClassNotFoundException, IOException {
         Properties props = new Properties();
         props.load(AdStatJob.class.getResourceAsStream("/application.properties"));
-        connection = ClickhouseUtil.getConn(props, "cc-bp11803zbt0oq045io.ads.rds.aliyuncs.com",
-                "8123", "data_monitoring");
+        connection = ClickhouseUtil.getConn(props);
     }
 
     //数据格式转换
@@ -51,6 +50,8 @@ public class CostMinuteProcess extends ProcessWindowFunction<AdStatOfMinuteDWD,
         costMinuterDM.minute = tmpMinute;
         //广告id
         costMinuterDM.adId = adStatOfMinuteDWD.getAdId().toString();
+        //服务商id
+        costMinuterDM.agencyAccountId = adStatOfMinuteDWD.getAgencyAccountId().toString();
         //广告组id
         costMinuterDM.adgroupId = adStatOfMinuteDWD.getAdgroupId().toString();
         //创意id

+ 10 - 14
flink-ad-monitoring/src/main/resources/application.properties

@@ -1,16 +1,12 @@
-maxCompute.accountId=LTAI5tFuLw65UsH3tqru2K1h
-maxCompute.accountKey=p1F8my4ovgcEfs3HVORdmeLlLUUKRp
-maxCompute.projectName=zx_ad_monitoring
-#maxCompute.accountId=LTAI5tF8Vfwm8hVajbKewiDZ
-#maxCompute.accountKey=rjOgeffIRMV7r5T6LBsOC47GI60Xge
-#maxCompute.projectName=zx_test02
-#maxCompute.endpoint=http://service.cn-hangzhou.maxcompute.aliyun.com/api
+maxCompute.accountId=LTAI5tSJaRFpCyV7FMx37jEz
+maxCompute.accountKey=Of1C3AD0l3Ob9Ic3drEXJjL6BCZ1eD
+maxCompute.projectName=qc_ad_monitoring
 maxCompute.endpoint=http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api
-#maxCompute.tunnelEndpoint=http://dt.cn-hangzhou.maxcompute.aliyun.com
 maxCompute.tunnelEndpoint=http://dt.cn-hangzhou.maxcompute.aliyun-inc.com
-kafka.servers=114.55.59.94:9093,112.124.33.132:9093
-kafka.username=alikafka_pre-cn-tl32fsx4l00x
-kafka.password=VOEdhZLjOrL76lrl5bqPtydtoEkbs0Ny
-# kafka.sslPath=D:\\Downloads\\kafka.client.truststore.jks
-kafka.sslPath=/root/flink-1.13.2/kafka.client.truststore.jks
-kafka.connModule=SASL_SSL
+kafka.servers=alikafka-pre-cn-2r42hduiq01t-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-2r42hduiq01t-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-2r42hduiq01t-3-vpc.alikafka.aliyuncs.com:9092
+kafka.username=alikafka_pre-cn-2r42hduiq01t
+kafka.password=7Xc0Te8IhhHX9rquEYIDiEsO4SgqmEIV
+kafka.connModule=VPC
+clickhouse.url=jdbc:clickhouse://cc-bp11803zbt0oq045io.ads.rds.aliyuncs.com:8123/data_monitoring
+clickhouse.username=qc
+clickhouse.password=Qc_123456