Ver código fonte

ADD:结果表--clickhouse模块

cxyu 3 anos atrás
pai
commit
55251c33e3
17 arquivos alterados com 1987 adições e 0 exclusões
  1. 20 0
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/clickhouse/MaxComputeLog.java
  2. 153 0
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/clickhouse/bean/BeanUtil.java
  3. 89 0
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/clickhouse/bean/DataTypeEnum.java
  4. 30 0
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/clickhouse/bean/annotation/ClickhouseColumn.java
  5. 10 0
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/clickhouse/bean/annotation/ClickhouseTable.java
  6. 243 0
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/clickhouse/sink/BatchSinkHour.java
  7. 145 0
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/clickhouse/sink/BatchSinkMinute.java
  8. 29 0
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/clickhouse/sink/ClickhouseUtil.java
  9. 97 0
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/clickhouse/sink/TunnelBatchSink.java
  10. 160 0
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/clickhouse/sink/TunnelBatchSinkBuffer.java
  11. 91 0
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/clickhouse/sink/TunnelBatchWriter.java
  12. 182 0
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/pojo/entity/CostHourDM.java
  13. 90 0
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/pojo/entity/CostMinuterDM.java
  14. 274 0
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostHourProcess.java
  15. 191 0
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostMinuteProcess.java
  16. 144 0
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/sink/ClickhouseBatchStreamSink.java
  17. 39 0
      flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/trigger/CostMinuteDMStreamTrigger.java

+ 20 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/clickhouse/MaxComputeLog.java

@@ -0,0 +1,20 @@
+package flink.zanxiangnet.ad.monitoring.clickhouse;
+
+import com.aliyun.odps.OdpsException;
+import com.aliyun.odps.rest.RestClient;
+
+public class MaxComputeLog extends RestClient.RetryLogger {
+    @Override
+    public void onRetryLog(Throwable e, long retryCount, long sleepTime) {
+        if (e instanceof OdpsException) {
+            String requestId = ((OdpsException) e).getRequestId();
+            if (requestId != null) {
+                System.err.println(String.format(
+                        "Warning: ODPS request failed, requestID:%s, retryCount:%d, will retry in %d seconds.", requestId, retryCount, sleepTime));
+                return;
+            }
+        }
+        System.err.println(String.format(
+                "Warning: ODPS request failed:%s, retryCount:%d, will retry in %d seconds.", e.getMessage(), retryCount, sleepTime));
+    }
+}

+ 153 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/clickhouse/bean/BeanUtil.java

@@ -0,0 +1,153 @@
+package flink.zanxiangnet.ad.monitoring.clickhouse.bean;
+
+import flink.zanxiangnet.ad.monitoring.clickhouse.bean.annotation.ClickhouseColumn;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class BeanUtil {
+    private static final Pattern humpPattern = Pattern.compile("[A-Z]");
+    private static final Pattern linePattern = Pattern.compile("_(\\w)");
+
+    /**
+     * 解析 javabean的字段
+     *
+     * @param clazz
+     * @return
+     */
+    public static List<FieldInfo> parseBeanField(Class<?> clazz) {
+        List<Field> fieldList = new ArrayList<>();
+        Class<?> tempClazz = clazz;
+        while (tempClazz != null) {
+            Field[] fields = tempClazz.getDeclaredFields();
+            if (fields.length != 0) {
+                fieldList.addAll(Arrays.asList(fields));
+            }
+            tempClazz = tempClazz.getSuperclass();
+        }
+        if (fieldList.size() == 0) {
+            throw new RuntimeException("Failed parsed class[" + clazz.getName() + "]. No Field!");
+        }
+        List<FieldInfo> fieldInfoList = new ArrayList<>(fieldList.size());
+        for (Field field : fieldList) {
+            if (Modifier.isFinal(field.getModifiers()) || Modifier.isStatic(field.getModifiers())) {
+                continue;
+            }
+            ClickhouseColumn annotation = field.getAnnotation(ClickhouseColumn.class);
+            FieldInfo fieldInfo = new FieldInfo();
+            fieldInfo.setField(field);
+            fieldInfo.setFieldName(field.getName());
+            try {
+                fieldInfo.setGetMethod(methodOfGet(clazz, field));
+                fieldInfo.setSetMethod(methodOfSet(clazz, field));
+            } catch (NoSuchMethodException e) {
+                throw new RuntimeException("Failed parsed class[" + clazz.getName() + "]. The get or set method of [" + field.getName() + "] was not found!");
+            }
+            if (annotation != null) {
+                if (annotation.ignore()) {
+                    continue;
+                }
+                if (annotation.isPartitioned()) {
+                    fieldInfo.setUsePartitioned(true);
+                }
+                if (StringUtils.isNotBlank(annotation.value())) {
+                    fieldInfo.setColumnName(annotation.value());
+                } else {
+                    fieldInfo.setColumnName(humpToLine(fieldInfo.getFieldName()));
+                }
+            } else {
+                fieldInfo.setColumnName(humpToLine(fieldInfo.getFieldName()));
+            }
+            fieldInfoList.add(fieldInfo);
+        }
+        return fieldInfoList;
+    }
+
+    /**
+     * 驼峰转下划线
+     *
+     * @param str
+     * @return
+     */
+    public static String humpToLine(String str) {
+        Matcher matcher = humpPattern.matcher(str);
+        StringBuffer sb = new StringBuffer();
+        while (matcher.find()) {
+            matcher.appendReplacement(sb, "_" + matcher.group(0).toLowerCase());
+        }
+        matcher.appendTail(sb);
+        return sb.toString();
+    }
+
+    /**
+     * 下划线转驼峰
+     *
+     * @param str
+     * @return
+     */
+    public static String lineToHump(String str) {
+        str = str.toLowerCase();
+        Matcher matcher = linePattern.matcher(str);
+        StringBuffer sb = new StringBuffer();
+        while (matcher.find()) {
+            matcher.appendReplacement(sb, matcher.group(1).toUpperCase());
+        }
+        matcher.appendTail(sb);
+        return sb.toString();
+    }
+
+    public static <T> Method methodOfGet(Class<T> clazz, Field field) throws NoSuchMethodException {
+        String fieldName = field.getName();
+        String methodName = "get" + fieldName.substring(0, 1).toUpperCase() + fieldName.substring(1);
+        return clazz.getMethod(methodName);
+    }
+
+    public static <T> Method methodOfSet(Class<T> clazz, Field field) throws NoSuchMethodException {
+        String fieldName = field.getName();
+        String methodName = "set" + fieldName.substring(0, 1).toUpperCase() + fieldName.substring(1);
+        return clazz.getMethod(methodName, field.getType());
+    }
+
+    @Data
+    @NoArgsConstructor
+    @AllArgsConstructor
+    @Builder
+    public static class FieldInfo {
+        /**
+         * 是否用于分
+         */
+        private boolean usePartitioned = false;
+        /**
+         * 字段名
+         */
+        private String fieldName;
+        /**
+         * 数据库中的列名
+         */
+        private String columnName;
+        /**
+         * 字段
+         */
+        private Field field;
+        /**
+         * 字段的 get方法
+         */
+        private Method getMethod;
+
+        /**
+         * 字段的 set方法
+         */
+        private Method setMethod;
+    }
+}

+ 89 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/clickhouse/bean/DataTypeEnum.java

@@ -0,0 +1,89 @@
+package flink.zanxiangnet.ad.monitoring.clickhouse.bean;
+
+import lombok.Getter;
+
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+
+@Getter
+public enum DataTypeEnum {
+    /**
+     * 8位有符号整型。
+     * 取值范围:-128~127。
+     */
+    TINYINT(Byte.class),
+    /**
+     * 16位有符号整型。
+     * 取值范围:-32768~32767。
+     */
+    SMALLINT(Integer.class),
+    /**
+     * 32位有符号整型。
+     * 取值范围:-2 31~2 31-1。
+     */
+    INT(Integer.class),
+    /**
+     * 64位有符号整型。
+     * 取值范围:-2 63+1~2 63 -1。
+     */
+    BIGINT(Long.class),
+    /**
+     * 二进制数据类型,目前长度限制为8MB。
+     */
+    // BINARY(Byte[].class),
+    /**
+     * 32位二进制浮点型。
+     */
+    FLOAT(Double.class),
+    /**
+     * 64位二进制浮点型。
+     */
+    DOUBLE(Double.class),
+    /**
+     * DECIMAL(precision,scale) 10进制精确数字类型。
+     * precision:表示最多可以表示多少位的数字。取值范围:1 <= precision <= 38。
+     * scale:表示小数部分的位数。取值范围:0 <= scale <= 38。
+     * 如果不指定以上两个参数,则默认为decimal(10,0)。
+     */
+    DECIMAL(BigDecimal.class),
+    /**
+     * 变长字符类型,n为长度。
+     * 取值范围:1~65535。
+     */
+    VARCHAR(String.class),
+    /**
+     * 固定长度字符类型,n为长度。最大取值255。长度不足则会填充空格,但空格不参与比较。
+     */
+    CHAR(String.class),
+    /**
+     * 字符串类型,长度限制为8 MB。
+     */
+    STRING(String.class),
+    /**
+     * 日期类型,格式为yyyy-mm-dd。
+     * 取值范围:0000-01-01~9999-12-31。
+     */
+    DATE(LocalDate.class),
+    /**
+     * 日期类型,格式为yyyy-mm-dd。
+     * 取值范围:0000-01-01~9999-12-31。
+     */
+    DATETIME(LocalDateTime.class),
+    /**
+     * 与时区无关的时间戳类型。
+     * 取值范围:0000-01-01 00:00:00.000000000~9999-12-31 23.59:59.999999999,精确到纳秒。
+     */
+    TIMESTAMP(LocalDateTime.class),
+    /**
+     * 与时区无关的时间戳类型。
+     * 取值范围:0000-01-01 00:00:00.000000000~9999-12-31 23.59:59.999999999,精确到纳秒。
+     */
+    BOOLEAN(Boolean.class);
+
+    private final Class<?> defaultJavaType;
+
+    DataTypeEnum(Class<?> javaType) {
+        this.defaultJavaType = javaType;
+    }
+}

+ 30 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/clickhouse/bean/annotation/ClickhouseColumn.java

@@ -0,0 +1,30 @@
+package flink.zanxiangnet.ad.monitoring.clickhouse.bean.annotation;
+
+import java.lang.annotation.*;
+
+@Documented
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.FIELD, ElementType.ANNOTATION_TYPE})
+public @interface ClickhouseColumn {
+
+    /**
+     * MaxCompute中的列名
+     *
+     * @return
+     */
+    String value() default "";
+
+    /**
+     * 是否忽略该字段
+     *
+     * @return
+     */
+    boolean ignore() default false;
+
+    /**
+     * 是否是分区字段
+     *
+     * @return
+     */
+    boolean isPartitioned() default false;
+}

+ 10 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/clickhouse/bean/annotation/ClickhouseTable.java

@@ -0,0 +1,10 @@
+package flink.zanxiangnet.ad.monitoring.clickhouse.bean.annotation;
+
+import java.lang.annotation.*;
+
+@Documented
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE, ElementType.ANNOTATION_TYPE})
+public @interface ClickhouseTable {
+    String value();
+}

+ 243 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/clickhouse/sink/BatchSinkHour.java

@@ -0,0 +1,243 @@
+package flink.zanxiangnet.ad.monitoring.clickhouse.sink;
+
+import flink.zanxiangnet.ad.monitoring.pojo.entity.CostHourDM;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.PreparedStatement;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+public class BatchSinkHour extends RichSinkFunction<CostHourDM> {
+    private static final Logger log = LoggerFactory.getLogger(BatchSinkHour.class);
+
+    // 对象锁,防止MaxCompute的 Tunnel对象多次初始化
+    private static final Object DUMMY_LOCK = new Object();
+
+    private String sql;
+    private Connection connection = null;
+
+    public BatchSinkHour() {
+    }
+
+    @Override
+    public void open(Configuration config) throws Exception {
+        super.open(config);
+        connection = ClickhouseUtil.getConn("cc-bp11803zbt0oq045io.ads.rds.aliyuncs.com",
+                "8123", "data_monitoring");
+    }
+
+    @Override
+    public void invoke(CostHourDM costhour, Context context) throws SQLException {
+        String sql = "insert into data_monitoring.cost_hour " +
+                "(dt,\n" +
+                "create_time,\n" +
+                "hour,\n" +
+                "ad_id,\n" +
+                "adgroup_id,\n" +
+                "adcreative_id,\n" +
+                "account_id,\n" +
+                "campaign_id,\n" +
+                "cost_total,\n" +
+                "cost_day,\n" +
+                "cost_hour,\n" +
+                "cost_diff,\n" +
+                "cost_last_hour,\n" +
+                "cost_last_hour_diff,\n" +
+                "cost_last_two_hour,\n" +
+                "cost_last_two_hour_diff,\n" +
+                "cost_last_three_trend,\n" +
+                "cost_speed,\n" +
+                "view_count_total,\n" +
+                "view_count_day,\n" +
+                "view_count_hour,\n" +
+                "thousand_display_price_all,\n" +
+                "thousand_display_price_day,\n" +
+                "thousand_display_price_hour,\n" +
+                "valid_click_count_total,\n" +
+                "valid_click_count_day,\n" +
+                "valid_click_count_hour,\n" +
+                "ctr_all,\n" +
+                "ctr_day,\n" +
+                "ctr_hour,\n" +
+                "cpc_all,\n" +
+                "cpc_day,\n" +
+                "cpc_hour,\n" +
+                "conversions_count_total,\n" +
+                "conversions_count_day,\n" +
+                "conversions_count_hour,\n" +
+                "conversions_cost_total,\n" +
+                "conversions_cost_day,\n" +
+                "conversions_cost_hour,\n" +
+                "conversions_rate_all,\n" +
+                "conversions_rate_day,\n" +
+                "conversions_rate_hour,\n" +
+                "first_day_order_roi_total,\n" +
+                "first_day_order_roi_day,\n" +
+                "first_day_order_roi_hour,\n" +
+                "first_day_order_amount_total,\n" +
+                "first_day_order_amount_day,\n" +
+                "first_day_order_amount_hour,\n" +
+                "first_day_order_count_total,\n" +
+                "first_day_order_count_day,\n" +
+                "first_day_order_count_hour,\n" +
+                "web_order_amount_total,\n" +
+                "web_order_amount_day,\n" +
+                "web_order_amount_hour,\n" +
+                "web_order_cost_total,\n" +
+                "web_order_cost_day,\n" +
+                "web_order_cost_hour,\n" +
+                "web_order_rate_total,\n" +
+                "web_order_rate_day,\n" +
+                "web_order_rate_hour,\n" +
+                "web_order_count_total,\n" +
+                "web_order_count_day,\n" +
+                "web_order_count_hour,\n" +
+                "order_roi_total,\n" +
+                "order_roi_day,\n" +
+                "order_roi_hour,\n" +
+                "order_unit_price_total,\n" +
+                "order_unit_price_day,\n" +
+                "order_unit_price_hour,\n" +
+                "from_follow_uv_total,\n" +
+                "from_follow_uv_day,\n" +
+                "from_follow_uv_hour,\n" +
+                "from_follow_cost_total,\n" +
+                "from_follow_cost_day,\n" +
+                "from_follow_cost_hour,\n" +
+                "from_follow_rate_total,\n" +
+                "from_follow_rate_day,\n" +
+                "from_follow_rate_hour,\n" +
+                "web_register_count_total,\n" +
+                "web_register_count_day,\n" +
+                "web_register_count_hour,\n" +
+                "web_register_uv_total,\n" +
+                "web_register_uv_day,\n" +
+                "web_register_uv_hour,\n" +
+                "web_register_cost_total,\n" +
+                "web_register_cost_day,\n" +
+                "web_register_cost_hour) values " +
+                "(?,?,?,?,?,?,?,?,?,?,\n" +
+                "?,?,?,?,?,?,?,?,?,?,\n" +
+                "?,?,?,?,?,?,?,?,?,?,\n" +
+                "?,?,?,?,?,?,?,?,?,?,\n" +
+                "?,?,?,?,?,?,?,?,?,?,\n" +
+                "?,?,?,?,?,?,?,?,?,?,\n" +
+                "?,?,?,?,?,?,?,?,?,?,\n" +
+                "?,?,?,?,?,?,?,?,?,?,\n" +
+                "?,?,?,?,?,?,?)";
+        System.out.println(costhour);
+        PreparedStatement preparedStatement = connection.prepareStatement(sql);
+        preparedStatement.setString(1, costhour.dt);
+        preparedStatement.setString(2, costhour.createTime);
+        preparedStatement.setString(3, costhour.hour);
+        preparedStatement.setString(4, costhour.adId);
+        preparedStatement.setString(5, costhour.adgroupId);
+        preparedStatement.setString(6, costhour.adcreativeId);
+        preparedStatement.setString(7, costhour.accountId);
+        preparedStatement.setString(8, costhour.campaignId);
+        preparedStatement.setLong(9, costhour.costTotal);
+        preparedStatement.setLong(10, costhour.costDay);
+        preparedStatement.setLong(11, costhour.costHour);
+        preparedStatement.setLong(12, costhour.costDiff);
+        preparedStatement.setLong(13, costhour.costLastHour);
+        preparedStatement.setLong(14, costhour.costLastHourDiff);
+        preparedStatement.setLong(15, costhour.costLastTwoHour);
+        preparedStatement.setLong(16, costhour.costLastTwoHourDiff);
+        preparedStatement.setLong(17, costhour.costLastThreeTrend);
+        preparedStatement.setDouble(18, costhour.costSpeed);
+        preparedStatement.setLong(19, costhour.viewCountTotal);
+        preparedStatement.setLong(20, costhour.viewCountDay);
+        preparedStatement.setLong(21, costhour.viewCountHour);
+        preparedStatement.setLong(22, costhour.thousandDisplayPriceAll);
+        preparedStatement.setLong(23, costhour.thousandDisplayPriceDay);
+        preparedStatement.setLong(24, costhour.thousandDisplayPriceHour);
+        preparedStatement.setLong(25, costhour.validClickCountTotal);
+        preparedStatement.setLong(26, costhour.validClickCountDay);
+        preparedStatement.setLong(27, costhour.validClickCountHour);
+        preparedStatement.setDouble(28, costhour.ctrAll);
+        preparedStatement.setDouble(29, costhour.ctrDay);
+        preparedStatement.setDouble(30, costhour.ctrHour);
+        preparedStatement.setLong(31, costhour.cpcAll);
+        preparedStatement.setLong(32, costhour.cpcDay);
+        preparedStatement.setLong(33, costhour.cpcHour);
+        preparedStatement.setLong(34, costhour.conversionsCountTotal);
+        preparedStatement.setLong(35, costhour.conversionsCountDay);
+        preparedStatement.setLong(36, costhour.conversionsCountHour);
+        preparedStatement.setLong(37, costhour.conversionsCostTotal);
+        preparedStatement.setLong(38, costhour.conversionsCostDay);
+        preparedStatement.setLong(39, costhour.conversionsCostHour);
+        preparedStatement.setDouble(40, costhour.conversionsRateAll);
+        preparedStatement.setDouble(41, costhour.conversionsRateDay);
+        preparedStatement.setDouble(42, costhour.conversionsRateHour);
+        preparedStatement.setDouble(43, costhour.firstDayOrderRoiTotal);
+        preparedStatement.setDouble(44, costhour.firstDayOrderRoiDay);
+        preparedStatement.setDouble(45, costhour.firstDayOrderRoiHour);
+        preparedStatement.setLong(46, costhour.firstDayOrderAmountTotal);
+        preparedStatement.setLong(47, costhour.firstDayOrderAmountDay);
+        preparedStatement.setLong(48, costhour.firstDayOrderAmountHour);
+        preparedStatement.setLong(49, costhour.firstDayOrderCountTotal);
+        preparedStatement.setLong(50, costhour.firstDayOrderCountDay);
+        preparedStatement.setLong(51, costhour.firstDayOrderCountHour);
+        preparedStatement.setLong(52, costhour.webOrderAmountTotal);
+        preparedStatement.setLong(53, costhour.webOrderAmountDay);
+        preparedStatement.setLong(54, costhour.webOrderAmountHour);
+        preparedStatement.setLong(55, costhour.webOrderCostTotal);
+        preparedStatement.setLong(56, costhour.webOrderCostDay);
+        preparedStatement.setLong(57, costhour.webOrderCostHour);
+        preparedStatement.setDouble(58, costhour.webOrderRateTotal);
+        preparedStatement.setDouble(59, costhour.webOrderRateDay);
+        preparedStatement.setDouble(60, costhour.webOrderRateHour);
+        preparedStatement.setLong(61, costhour.webOrderCountTotal);
+        preparedStatement.setLong(62, costhour.webOrderCountDay);
+        preparedStatement.setLong(63, costhour.webOrderCountHour);
+        preparedStatement.setDouble(64, costhour.orderRoiTotal);
+        preparedStatement.setDouble(65, costhour.orderRoiDay);
+        preparedStatement.setDouble(66, costhour.orderRoiHour);
+        preparedStatement.setLong(67, costhour.orderUnitPriceTotal);
+        preparedStatement.setLong(68, costhour.orderUnitPriceDay);
+        preparedStatement.setLong(69, costhour.orderUnitPriceHour);
+        preparedStatement.setLong(70, costhour.fromFollowUvTotal);
+        preparedStatement.setLong(71, costhour.fromFollowUvDay);
+        preparedStatement.setLong(72, costhour.fromFollowUvHour);
+        preparedStatement.setLong(73, costhour.fromFollowCostTotal);
+        preparedStatement.setLong(74, costhour.fromFollowCostDay);
+        preparedStatement.setLong(75, costhour.fromFollowCostHour);
+        preparedStatement.setDouble(76, costhour.fromFollowRateTotal);
+        preparedStatement.setDouble(77, costhour.fromFollowRateDay);
+        preparedStatement.setDouble(78, costhour.fromFollowRateHour);
+        preparedStatement.setLong(79, costhour.webRegisterCountTotal);
+        preparedStatement.setLong(80, costhour.webRegisterCountDay);
+        preparedStatement.setLong(81, costhour.webRegisterCountHour);
+        preparedStatement.setLong(82, costhour.webRegisterUvTotal);
+        preparedStatement.setLong(83, costhour.webRegisterUvDay);
+        preparedStatement.setLong(84, costhour.webRegisterUvHour);
+        preparedStatement.setDouble(85, costhour.webRegisterCostTotal);
+        preparedStatement.setDouble(86, costhour.webRegisterCostDay);
+        preparedStatement.setDouble(87, costhour.webRegisterCostHour);
+        preparedStatement.addBatch();
+        long startTime = System.currentTimeMillis();
+        int ints[] = preparedStatement.executeBatch();
+        connection.commit();
+        long endTime = System.currentTimeMillis();
+        System.out.println("批量插入耗时: " + (endTime - startTime) + "插入数据数量=" + ints.length);
+        //clickhouse 处理重复数据
+        //TODO:数据去重有问题,去除掉非最新的数据
+        Statement statement_duplicate = connection.createStatement();
+        String sql_duplicate = "optimize table data_monitoring.cost_hour final;";
+        statement_duplicate.executeQuery(sql_duplicate);
+        connection.commit();
+        long endTime_dp = System.currentTimeMillis();
+        System.out.println("数据清理耗时: " + (endTime_dp - endTime));
+
+    }
+
+    @Override
+    public void close() throws Exception {
+
+        super.close();
+    }
+}

+ 145 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/clickhouse/sink/BatchSinkMinute.java

@@ -0,0 +1,145 @@
+package flink.zanxiangnet.ad.monitoring.clickhouse.sink;
+
+import flink.zanxiangnet.ad.monitoring.pojo.entity.CostMinuterDM;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+public class BatchSinkMinute extends RichSinkFunction<CostMinuterDM> {
+    private static final Logger log = LoggerFactory.getLogger(BatchSinkMinute.class);
+
+    // 对象锁,防止MaxCompute的 Tunnel对象多次初始化
+    private static final Object DUMMY_LOCK = new Object();
+
+    private String sql;
+    private Connection connection = null;
+
+    public BatchSinkMinute() {
+
+    }
+
+    @Override
+    public void open(Configuration config) throws Exception {
+        super.open(config);
+        connection=ClickhouseUtil.getConn("cc-bp11803zbt0oq045io.ads.rds.aliyuncs.com",
+                "8123", "data_monitoring");
+    }
+
+    @Override
+    public void invoke(CostMinuterDM costMinuterDM, Context context) throws SQLException {
+        String sql = "insert into data_monitoring.cost_minute " +
+                "(dt,\n" +
+                "minute,\n" +
+                "create_time,\n" +
+                "hour,\n" +
+                "ad_id,\n" +
+                "adgroup_id,\n" +
+                "adcreative_id,\n" +
+                "account_id,\n" +
+                "campaign_id,\n" +
+                "cost_minute,\n" +
+                "cost_diff,\n" +
+                "cost_last_hour,\n" +
+                "cost_last_hour_diff,\n" +
+                "cost_last_two_hour,\n" +
+                "cost_last_two_hour_diff,\n" +
+                "cost_last_three_trend,\n" +
+                "cost_speed,\n" +
+                "view_count_minute,\n" +
+                "thousand_display_price_minute,\n" +
+                "valid_click_count_minute,\n" +
+                "ctr_minute,\n" +
+                "cpc_minute,\n" +
+                "conversions_count_minute,\n" +
+                "conversions_cost_minute,\n" +
+                "conversions_rate_minute,\n" +
+                "first_day_order_roi_minute,\n" +
+                "first_day_order_amount_minute,\n" +
+                "first_day_order_count_minute,\n" +
+                "web_order_amount_minute,\n" +
+                "web_order_cost_minute,\n" +
+                "web_order_rate_minute,\n" +
+                "web_order_count_minute,\n" +
+                "order_roi_minute,\n" +
+                "order_unit_price_minute,\n" +
+                "from_follow_uv_minute,\n" +
+                "from_follow_cost_minute,\n" +
+                "from_follow_rate_minute,\n" +
+                "web_register_count_minute,\n" +
+                "web_register_uv_minute,\n" +
+                "web_register_cost_minute) values " +
+                "(?,?,?,?,?,?,?,?,?,?,\n" +
+                "?,?,?,?,?,?,?,?,?,?,\n" +
+                "?,?,?,?,?,?,?,?,?,?,\n" +
+                "?,?,?,?,?,?,?,?,?,?,\n)";
+        System.out.println(costMinuterDM);
+        PreparedStatement preparedStatement = connection.prepareStatement(sql);
+        preparedStatement.setString(1, costMinuterDM.dt);
+        preparedStatement.setString(2, costMinuterDM.minute);
+        preparedStatement.setString(3, costMinuterDM.createTime);
+        preparedStatement.setString(4, costMinuterDM.hour);
+        preparedStatement.setString(5, costMinuterDM.adId);
+        preparedStatement.setString(6, costMinuterDM.adgroupId);
+        preparedStatement.setString(7, costMinuterDM.adcreativeId);
+        preparedStatement.setString(8, costMinuterDM.accountId);
+        preparedStatement.setString(9, costMinuterDM.campaignId);
+        preparedStatement.setLong(10, costMinuterDM.costMinute);
+        preparedStatement.setLong(11, costMinuterDM.costDiff);
+        preparedStatement.setLong(12, costMinuterDM.costLastHour);
+        preparedStatement.setLong(13, costMinuterDM.costLastHourDiff);
+        preparedStatement.setLong(14, costMinuterDM.costLastTwoHour);
+        preparedStatement.setLong(15, costMinuterDM.costLastTwoHourDiff);
+        preparedStatement.setLong(16, costMinuterDM.costLastThreeTrend);
+        preparedStatement.setDouble(17, costMinuterDM.costSpeed);
+        preparedStatement.setLong(18, costMinuterDM.viewCountMinute);
+        preparedStatement.setLong(19, costMinuterDM.thousandDisplayPriceMinute);
+        preparedStatement.setLong(20, costMinuterDM.validClickCountMinute);
+        preparedStatement.setDouble(21, costMinuterDM.ctrMinute);
+        preparedStatement.setLong(22, costMinuterDM.cpcMinute);
+        preparedStatement.setLong(23, costMinuterDM.conversionsCountMinute);
+        preparedStatement.setLong(24, costMinuterDM.conversionsCostMinute);
+        preparedStatement.setDouble(25, costMinuterDM.conversionsRateMinute);
+        preparedStatement.setDouble(26, costMinuterDM.firstDayOrderRoiMinute);
+        preparedStatement.setLong(27, costMinuterDM.firstDayOrderAmountMinute);
+        preparedStatement.setLong(28, costMinuterDM.firstDayOrderCountMinute);
+        preparedStatement.setLong(29, costMinuterDM.webOrderAmountMinute);
+        preparedStatement.setLong(30, costMinuterDM.webOrderCostMinute);
+        preparedStatement.setDouble(31, costMinuterDM.webOrderRateMinute);
+        preparedStatement.setLong(32, costMinuterDM.webOrderCountMinute);
+        preparedStatement.setDouble(33, costMinuterDM.orderRoiMinute);
+        preparedStatement.setLong(34, costMinuterDM.orderUnitPriceMinute);
+        preparedStatement.setLong(35, costMinuterDM.fromFollowUvMinute);
+        preparedStatement.setLong(36, costMinuterDM.fromFollowCostMinute);
+        preparedStatement.setDouble(37, costMinuterDM.fromFollowRateMinute);
+        preparedStatement.setLong(38, costMinuterDM.webRegisterCountMinute);
+        preparedStatement.setLong(39, costMinuterDM.webRegisterUvMinute);
+        preparedStatement.setDouble(40, costMinuterDM.webRegisterCostMinute);
+
+        preparedStatement.addBatch();
+        long startTime = System.currentTimeMillis();
+        int ints[] = preparedStatement.executeBatch();
+        connection.commit();
+        long endTime = System.currentTimeMillis();
+        System.out.println("批量插入耗时: " + (endTime - startTime) + "插入数据数量=" + ints.length);
+        //TODO:数据去重有问题,去除掉非最新的数据
+        Statement statement_duplicate = connection.createStatement();
+        String sql_duplicate = "optimize table data_monitoring.cost_minute final;";
+        statement_duplicate.executeQuery(sql_duplicate);
+        connection.commit();
+        long endTime_dp = System.currentTimeMillis();
+        System.out.println("数据清理耗时: " + (endTime_dp - endTime));
+
+    }
+
+    @Override
+    public void close() throws Exception {
+
+        super.close();
+    }
+}

+ 29 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/clickhouse/sink/ClickhouseUtil.java

@@ -0,0 +1,29 @@
+package flink.zanxiangnet.ad.monitoring.clickhouse.sink;
+
+import ru.yandex.clickhouse.settings.ClickHouseProperties;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+public class ClickhouseUtil {
+    private static Connection connection;
+
+    public static Connection getConn(String host, String port, String database) throws ClassNotFoundException, SQLException {
+        Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
+
+        ClickHouseProperties properties = new ClickHouseProperties();
+        String user = "qc";
+        String password = "Qc_123456";
+
+        String address = "jdbc:clickhouse://" + host + ":" + port + "/" + database;
+        connection = DriverManager.getConnection(address, user, password);
+        return connection;
+    }
+
+
+    public void close() throws SQLException {
+        connection.close();
+    }
+
+}

+ 97 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/clickhouse/sink/TunnelBatchSink.java

@@ -0,0 +1,97 @@
+package flink.zanxiangnet.ad.monitoring.clickhouse.sink;
+
+import com.aliyun.odps.Odps;
+import com.aliyun.odps.Table;
+import com.aliyun.odps.account.Account;
+import com.aliyun.odps.account.AliyunAccount;
+import com.aliyun.odps.tunnel.TableTunnel;
+import flink.zanxiangnet.ad.monitoring.clickhouse.MaxComputeLog;
+import flink.zanxiangnet.ad.monitoring.clickhouse.bean.BeanUtil;
+import flink.zanxiangnet.ad.monitoring.clickhouse.bean.annotation.ClickhouseTable;
+import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+public class TunnelBatchSink<IN> extends RichSinkFunction<IN> {
+    private static final Logger log = LoggerFactory.getLogger(TunnelBatchSink.class);
+
+    // 对象锁,防止MaxCompute的 Tunnel对象多次初始化
+    private static final Object DUMMY_LOCK = new Object();
+
+    private final Class<IN> clazz;
+    // 缓存刷新的间隔时间
+    private final Long bufferRefreshTime;
+    // 缓存的最大数据量
+    private final Long maxBufferCount;
+    // 可能同时存在的分区数
+    private final Integer partitionCount;
+
+    private volatile transient TunnelBatchSinkBuffer<IN> sinkBuffer;
+
+    public TunnelBatchSink(Class<IN> clazz, Long bufferRefreshTime, Long maxBufferCount, Integer partitionCount) {
+        this.clazz = clazz;
+        this.bufferRefreshTime = bufferRefreshTime;
+        this.maxBufferCount = maxBufferCount;
+        this.partitionCount = partitionCount;
+    }
+
+    @Override
+    public void open(Configuration config) {
+        if (sinkBuffer == null) {
+            synchronized (DUMMY_LOCK) {
+                if (sinkBuffer == null) {
+                    Map<String, String> params = getRuntimeContext()
+                            .getExecutionConfig()
+                            .getGlobalJobParameters()
+                            .toMap();
+                    ClickhouseTable tableAnnotation = clazz.getAnnotation(ClickhouseTable.class);
+
+                    Account account = new AliyunAccount(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_ID),
+                            params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_KEY));
+                    Odps odps = new Odps(account);
+                    odps.getRestClient().setRetryLogger(new MaxComputeLog());
+                    odps.setEndpoint(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_ENDPOINT));
+                    odps.setDefaultProject(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_PROJECT_NAME));
+                    TableTunnel tunnel = new TableTunnel(odps);
+                    tunnel.setEndpoint(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_TUNNEL_ENDPOINT));
+                    Table table = odps.tables().get(tableAnnotation.value());
+                    List<BeanUtil.FieldInfo> fieldInfoList = BeanUtil.parseBeanField(clazz);
+                    sinkBuffer = new TunnelBatchSinkBuffer<>(bufferRefreshTime,
+                            maxBufferCount,
+                            partitionCount,
+                            tunnel,
+                            params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_PROJECT_NAME),
+                            table,
+                            fieldInfoList);
+                }
+            }
+        }
+    }
+
+    /**
+     * 将值写入到 Sink。每个值都会调用此函数
+     *
+     * @param value
+     * @param context
+     */
+    @Override
+    public void invoke(IN value, Context context) {
+        sinkBuffer.put(value);
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (sinkBuffer != null) {
+            synchronized (DUMMY_LOCK) {
+                sinkBuffer.close();
+            }
+        }
+
+        super.close();
+    }
+}

+ 160 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/clickhouse/sink/TunnelBatchSinkBuffer.java

@@ -0,0 +1,160 @@
+package flink.zanxiangnet.ad.monitoring.clickhouse.sink;
+
+import com.aliyun.odps.PartitionSpec;
+import com.aliyun.odps.Table;
+import com.aliyun.odps.tunnel.TableTunnel;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import flink.zanxiangnet.ad.monitoring.clickhouse.bean.BeanUtil;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.CollectionUtils;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.*;
+import java.util.stream.Collectors;
+
+/**
+ * 缓存要写入 MaxCompute的数据
+ */
+public class TunnelBatchSinkBuffer<T> {
+    private static final Logger log = LoggerFactory.getLogger(TunnelBatchSinkBuffer.class);
+
+    private static final String PART = "PART";
+    private static final String NO_PART = "NO_PART";
+
+    private final TableTunnel tunnel;
+    private final String projectName;
+    private final Table tableInfo;
+
+    private final Long buffRefreshTime;
+    private final Long maxBuffer;
+
+    private final ConcurrentHashMap<String, List<T>> partitionData;
+    private final Map<String, Method> partitionFieldMethods;
+    private final List<BeanUtil.FieldInfo> fieldInfoList;
+
+    private boolean isRun = false;
+    private Thread coreThread;
+    private final ExecutorService threadPool;
+
+    public TunnelBatchSinkBuffer(
+            Long buffRefreshTime,
+            Long maxBuffer,
+            Integer partitionCount,
+            TableTunnel tunnel,
+            String projectName,
+            Table tableInfo,
+            List<BeanUtil.FieldInfo> fieldInfoList
+    ) {
+        this.tunnel = tunnel;
+        this.projectName = projectName;
+        this.tableInfo = tableInfo;
+        this.buffRefreshTime = buffRefreshTime;
+        this.maxBuffer = maxBuffer;
+        this.fieldInfoList = fieldInfoList;
+        this.partitionData = new ConcurrentHashMap<>(partitionCount);
+
+        this.partitionFieldMethods = fieldInfoList.stream().filter(BeanUtil.FieldInfo::isUsePartitioned)
+                .collect(Collectors.toMap(BeanUtil.FieldInfo::getColumnName, BeanUtil.FieldInfo::getGetMethod));
+        threadPool = new ThreadPoolExecutor(
+                partitionCount,
+                partitionCount,
+                0L,
+                TimeUnit.MILLISECONDS,
+                new LinkedBlockingQueue<>(),
+                new ThreadFactoryBuilder()
+                        .setNameFormat("maxcompute-writer-%d").build());
+        start();
+    }
+
+    /**
+     * 写入数据
+     *
+     * @param t 数据
+     */
+    public void put(T t) {
+        StringBuilder partition;
+        if (CollectionUtils.isEmpty(partitionFieldMethods)) {
+            partition = new StringBuilder(NO_PART);
+        } else {
+            partition = new StringBuilder(PART);
+            for (Map.Entry<String, Method> entry : partitionFieldMethods.entrySet()) {
+                partition.append(entry.getKey()).append("=");
+                try {
+                    partition.append(entry.getValue().invoke(t));
+                } catch (InvocationTargetException | IllegalAccessException e) {
+                    // 获取分区字段的值失败
+                    log.error(e.getMessage(), e);
+                    throw new RuntimeException("Failed get partition field value!");
+                }
+                partition.append(",");
+            }
+            partition = new StringBuilder(partition.substring(0, partition.length() - 1));
+        }
+        List<T> list = partitionData.computeIfAbsent(partition.toString(), key -> new ArrayList<>());
+        list.add(t);
+    }
+
+    public void close() {
+        isRun = false;
+        threadPool.shutdown();
+        coreThread.interrupt();
+    }
+
+    private void start() {
+        coreThread = new Thread(() -> {
+            Map<String, Long> lastSaveTime = new HashMap<>();
+            // 开启一个定时轮询缓存数据的线程
+            while (isRun) {
+                try {
+                    if (CollectionUtils.isEmpty(partitionData)) {
+                        // 没有数据,等 5s
+                        try {
+                            Thread.sleep(5000L);
+                        } catch (InterruptedException e) {
+                            Thread.sleep(5000L);
+                        }
+                        continue;
+                    }
+                    long now = System.currentTimeMillis();
+                    List<String> partitionKeys = new ArrayList<>(partitionData.size());
+                    for (Map.Entry<String, List<T>> entry : partitionData.entrySet()) {
+                        Long lastSaveTimeOfKey = lastSaveTime.computeIfAbsent(entry.getKey(), key -> now);
+                        boolean mustSave = now - lastSaveTimeOfKey > buffRefreshTime;
+                        if (mustSave || entry.getValue().size() > maxBuffer) {
+                            partitionKeys.add(entry.getKey());
+                        }
+                    }
+                    if (CollectionUtils.isEmpty(partitionKeys)) {
+                        Thread.sleep(5000L);
+                        continue;
+                    }
+
+                    for (String partitionKey : partitionKeys) {
+                        List<T> dataList = partitionData.remove(partitionKey);
+                        lastSaveTime.remove(partitionKey);
+
+                        PartitionSpec part = null;
+                        if (partitionKey.startsWith(PART)) {
+                            part = new PartitionSpec(partitionKey.substring(PART.length()));
+                            // 尽然不会自己建分区!!!
+                            // 创建单个分区大概 1~5秒(大部分在 2、3秒)
+                            tableInfo.createPartition(part, true);
+                        }
+                        threadPool.submit(new TunnelBatchWriter<>(tunnel, projectName, tableInfo.getName(), part, dataList, fieldInfoList));
+                    }
+                } catch (Exception e) {
+                    log.error(e.getMessage(), e);
+                }
+            }
+        });
+        isRun = true;
+        coreThread.start();
+    }
+}

+ 91 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/clickhouse/sink/TunnelBatchWriter.java

@@ -0,0 +1,91 @@
+package flink.zanxiangnet.ad.monitoring.clickhouse.sink;
+
+import com.aliyun.odps.PartitionSpec;
+import com.aliyun.odps.data.Record;
+import com.aliyun.odps.data.RecordWriter;
+import com.aliyun.odps.tunnel.TableTunnel;
+import com.aliyun.odps.tunnel.TunnelException;
+import flink.zanxiangnet.ad.monitoring.clickhouse.bean.BeanUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.CollectionUtils;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.List;
+
+/**
+ * 实际执行 MaxCompute写入逻辑
+ */
+public class TunnelBatchWriter<T> implements Runnable {
+    private static final Logger log = LoggerFactory.getLogger(TunnelBatchWriter.class);
+
+    private final TableTunnel tunnel;
+    private final String projectName;
+    private final String tableName;
+    private final PartitionSpec part;
+    private final List<T> dataList;
+    private final List<BeanUtil.FieldInfo> fieldInfoList;
+
+    public TunnelBatchWriter(TableTunnel tunnel,
+                             String projectName,
+                             String tableName,
+                             PartitionSpec part,
+                             List<T> dataList,
+                             List<BeanUtil.FieldInfo> fieldInfoList
+    ) {
+        this.tunnel = tunnel;
+        this.projectName = projectName;
+        this.tableName = tableName;
+        this.part = part;
+        this.dataList = dataList;
+        this.fieldInfoList = fieldInfoList;
+    }
+
+    @Override
+    public void run() {
+        if (CollectionUtils.isEmpty(dataList)) {
+            return;
+        }
+        int retry = 0;
+        do {
+            try {
+                if (retry > 0) {
+                    log.error("重试:" + retry);
+                }
+                // 创建 session大概要 200多毫秒
+                TableTunnel.UploadSession uploadSession = part == null ? tunnel.createUploadSession(projectName, tableName)
+                        : tunnel.createUploadSession(projectName, tableName, part);
+                long blockId = 0;
+                RecordWriter writer = uploadSession.openRecordWriter(blockId);
+                for (T t : dataList) {
+                    Record record = uploadSession.newRecord();
+                    for (BeanUtil.FieldInfo fieldInfo : fieldInfoList) {
+                        if (fieldInfo.isUsePartitioned()) {
+                            // 分区字段不在这里设值
+                            continue;
+                        }
+                        Object obj = fieldInfo.getGetMethod().invoke(t);
+                        record.set(fieldInfo.getColumnName(), obj);
+                    }
+                    writer.write(record);
+                }
+                writer.close();
+                uploadSession.commit(new Long[]{blockId});
+                break;
+            } catch (IOException | InvocationTargetException | IllegalAccessException | TunnelException e) {
+                log.error(e.getMessage(), e);
+                if (++retry >= 3) {
+                    log.error("重试失败");
+                    throw new RuntimeException("Failed writer to MaxCompute!!! Error msg: " + e.getMessage());
+                }
+            } catch (Exception e) {
+                if (++retry >= 3) {
+                    log.error("重试失败");
+                    log.error(e.getMessage(), e);
+                    throw new RuntimeException("Failed writer to MaxCompute!!! Error msg: " + e.getMessage());
+                }
+            }
+        } while (true);
+    }
+}

+ 182 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/pojo/entity/CostHourDM.java

@@ -0,0 +1,182 @@
+package flink.zanxiangnet.ad.monitoring.pojo.entity;
+
+import lombok.Data;
+
+@Data
+public class CostHourDM {
+    //时间-天
+    public String dt;
+    //真实时间
+    public String createTime;
+    //计划 id
+    public String campaignId;
+    //时间-小时
+    public String hour;
+    //广告id
+    public String adId;
+    //广告组id
+    public String adgroupId;
+    //创意id
+    public String adcreativeId;
+    //账号id
+    public String accountId;
+    //总消耗
+    public long costTotal;
+    //当天消耗
+    public long costDay;
+    //当天小时消耗
+    public long costHour;
+    //当前小时与前一小时消耗差额
+    public long costDiff;
+    //前一小时消耗金额
+    public long costLastHour;
+    //前一小时与前二小时消耗差额
+    public long costLastHourDiff;
+    //前二小时消耗金额
+    public long costLastTwoHour;
+    //前二小时与前三小时消耗差额
+    public long costLastTwoHourDiff;
+    //前三小时消耗趋势
+    public long costLastThreeTrend;
+    //消耗速度
+    public double costSpeed;
+    //总浏览量
+    public long viewCountTotal;
+    //天-总浏览量
+    public long viewCountDay;
+    //小时-总浏览量
+    public long viewCountHour;
+    //总平均千次曝光成本
+    public long thousandDisplayPriceAll;
+    //天-总平均曝光成本
+    public long thousandDisplayPriceDay;
+    //小时-总平均曝光成本
+    public long thousandDisplayPriceHour;
+    //总点击量
+    public long validClickCountTotal;
+    //天-总点击量
+    public long validClickCountDay;
+    //小时-总点击量
+    public long validClickCountHour;
+    //总平均点击率
+    public double ctrAll;
+    //天-总平均点击率
+    public double ctrDay;
+    //小时-总平均点击率
+    public double ctrHour;
+    //总点击均价
+    public long cpcAll;
+    //天-总点击均价
+    public long cpcDay;
+    //小时-总点击均价
+    public long cpcHour;
+    //总目标转化量
+    public long conversionsCountTotal;
+    //天-总目标转化量
+    public long conversionsCountDay;
+    //小时-总目标转化量
+    public long conversionsCountHour;
+    //总目标平均转化成本
+    public long conversionsCostTotal;
+    //天-总目标平均转化成本
+    public long conversionsCostDay;
+    //小时-总目标平均转化成本
+    public long conversionsCostHour;
+    //总平均转化率
+    public double conversionsRateAll;
+    //天-总平均转化率
+    public double conversionsRateDay;
+    //小时-总平均转化率
+    public double conversionsRateHour;
+    //总首日下单roi
+    public double firstDayOrderRoiTotal;
+    //天-总首日下单roi
+    public double firstDayOrderRoiDay;
+    //小时-总首日下单roi
+    public double firstDayOrderRoiHour;
+    //总首日下单金额
+    public long firstDayOrderAmountTotal;
+    //天-总首日下单金额
+    public long firstDayOrderAmountDay;
+    //小时-总首日下单金额
+    public long firstDayOrderAmountHour;
+    //总首日下单量
+    public long firstDayOrderCountTotal;
+    //天-总首日下单量
+    public long firstDayOrderCountDay;
+    //小时-总首日下单量
+    public long firstDayOrderCountHour;
+    //总下单金额
+    public long webOrderAmountTotal;
+    //天-总下单金额
+    public long webOrderAmountDay;
+    //小时-总下单金额
+    public long webOrderAmountHour;
+    //总平均下单成本
+    public long webOrderCostTotal;
+    //天-总平均下单成本
+    public long webOrderCostDay;
+    //小时-总平均下单成本
+    public long webOrderCostHour;
+    //总平均下单率
+    public double webOrderRateTotal;
+    //天-总平均下单率
+    public double webOrderRateDay;
+    //小时-总平均下单率
+    public double webOrderRateHour;
+    //总平均下单量
+    public long webOrderCountTotal;
+    //天-总平均下单量
+    public long webOrderCountDay;
+    //小时-总平均下单量
+    public long webOrderCountHour;
+    //总下单ROI
+    public double orderRoiTotal;
+    //天-总下单roi
+    public double orderRoiDay;
+    //小时-总下单roi
+    public double orderRoiHour;
+    //总平均下单客单价
+    public long orderUnitPriceTotal;
+    //天-总平均下单客单价
+    public long orderUnitPriceDay;
+    //小时-总平均下单客单价
+    public long orderUnitPriceHour;
+    //总公众号关注量
+    public long fromFollowUvTotal;
+    //天-总公众号关注量
+    public long fromFollowUvDay;
+    //小时-总公众号关注量
+    public long fromFollowUvHour;
+    //总平均公众号关注成本
+    public long fromFollowCostTotal;
+    //天-总平均公众号关注成本
+    public long fromFollowCostDay;
+    //小时-总平均公众号关注成本
+    public long fromFollowCostHour;
+    //总平均公众号关注率
+    public double fromFollowRateTotal;
+    //天-总平均公众号关注率
+    public double fromFollowRateDay;
+    //小时-总平均公众号关注率
+    public double fromFollowRateHour;
+    //总注册数
+    public long webRegisterCountTotal;
+    //天-总注册数
+    public long webRegisterCountDay;
+    //小时-总注册数
+    public long webRegisterCountHour;
+    //总注册人数
+    public long webRegisterUvTotal;
+    //天-总注册人数
+    public long webRegisterUvDay;
+    //小时-总注册人数
+    public long webRegisterUvHour;
+    //总平均注册成本
+    public double webRegisterCostTotal;
+    //天-总平均注册成本
+    public double webRegisterCostDay;
+    //小时-总平均注册成本
+    public double webRegisterCostHour;
+
+}

+ 90 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/pojo/entity/CostMinuterDM.java

@@ -0,0 +1,90 @@
+package flink.zanxiangnet.ad.monitoring.pojo.entity;
+
+import lombok.Data;
+
+@Data
+public class CostMinuterDM {
+    //时间-天
+    public String dt;
+    //真实时间
+    public String createTime;
+    //计划 id
+    public String campaignId;
+    //时间-小时
+    public String hour;
+    //时间-分钟
+    public String minute;
+    //时间-真实
+    public String createtime;
+    //广告id
+    public String adId;
+    //广告组id
+    public String adgroupId;
+    //创意id
+    public String adcreativeId;
+    //账号id
+    public String accountId;
+    //当前5分钟消耗金额
+    public long costMinute;
+    //当前小时与前一小时消耗差额
+    public long costDiff;
+    //前一小时消耗金额
+    public long costLastHour;
+    //前一小时与前二小时消耗差额
+    public long costLastHourDiff;
+    //前二小时消耗金额
+    public long costLastTwoHour;
+    //前二小时与前三小时消耗差额
+    public long costLastTwoHourDiff;
+    //前三小时消耗趋势
+    public long costLastThreeTrend;
+    //消耗速度
+    public double costSpeed;
+    //分钟-总浏览量
+    public long viewCountMinute;
+    //分钟-总平均曝光成本
+    public long thousandDisplayPriceMinute;
+    //分钟-总点击量
+    public long validClickCountMinute;
+    //分钟-总平均点击率
+    public double ctrMinute;
+    //分钟-总点击均价
+    public long cpcMinute;
+    //分钟-总目标转化量
+    public long conversionsCountMinute;
+    //分钟-总目标平均转化成本
+    public long conversionsCostMinute;
+    //分钟-总平均转化率
+    public double conversionsRateMinute;
+    //分钟-总首日下单roi
+    public double firstDayOrderRoiMinute;
+    //分钟-总首日下单金额
+    public long firstDayOrderAmountMinute;
+    //分钟-总首日下单量
+    public long firstDayOrderCountMinute;
+    //分钟-总下单金额
+    public long webOrderAmountMinute;
+    //分钟-总平均下单成本
+    public long webOrderCostMinute;
+    //分钟-总平均下单率
+    public double webOrderRateMinute;
+    //分钟-总平均下单量
+    public long webOrderCountMinute;
+    //分钟-总下单roi
+    public double orderRoiMinute;
+    //分钟-总平均下单客单价
+    public long orderUnitPriceMinute;
+    //分钟-总公众号关注率
+    public long fromFollowUvMinute;
+    //分钟-总平均公众号关注成本
+    public long fromFollowCostMinute;
+    //分钟-总平均公众号关注率
+    public double fromFollowRateMinute;
+    //分钟-总注册数
+    public long webRegisterCountMinute;
+    //分钟-总注册人数
+    public long webRegisterUvMinute;
+    //分钟-总平均注册成本
+    public double webRegisterCostMinute;
+
+}

+ 274 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostHourProcess.java

@@ -0,0 +1,274 @@
+package flink.zanxiangnet.ad.monitoring.process;
+
+import flink.zanxiangnet.ad.monitoring.clickhouse.sink.ClickhouseUtil;
+import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfMinuteDWD;
+import flink.zanxiangnet.ad.monitoring.pojo.entity.CostHourDM;
+import flink.zanxiangnet.ad.monitoring.pojo.entity.CostMinuterDM;
+import flink.zanxiangnet.ad.monitoring.util.DateUtil;
+import flink.zanxiangnet.ad.monitoring.util.JsonUtil;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.SimpleDateFormat;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+public class CostHourProcess extends ProcessWindowFunction<AdStatOfMinuteDWD, CostHourDM, Long, TimeWindow> {
+    private static final DateTimeFormatter formatForLastReduceKey = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH:mm");
+    private Connection connection = null;
+    private int minutenow = 1;
+
+
+    @Override
+    public void open(Configuration conf) throws SQLException, ClassNotFoundException {
+        ClickhouseUtil clickhouseUtil = new ClickhouseUtil();
+        connection = ClickhouseUtil.getConn("cc-bp11803zbt0oq045io.ads.rds.aliyuncs.com",
+                "8123", "data_monitoring");
+    }
+
+    //数据格式转换
+    public CostHourDM datachange(AdStatOfMinuteDWD adStatOfMinuteDWD, CostHourDM costHourDM) {
+        //时间-天
+        costHourDM.dt = adStatOfMinuteDWD.getStatDay();
+        //计划 id
+        costHourDM.campaignId = adStatOfMinuteDWD.getCampaignId().toString();
+        //时间- real
+        costHourDM.createTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(adStatOfMinuteDWD.getCreateTime());
+        //时间-小时
+        String tmpHour = new SimpleDateFormat("yyyy-MM-dd HH:00:00").format(new Date(adStatOfMinuteDWD.getStatTime()));
+        costHourDM.hour = tmpHour;
+        //广告id
+        costHourDM.adId = adStatOfMinuteDWD.getAdId().toString();
+        //广告组id
+        costHourDM.adgroupId = adStatOfMinuteDWD.getAdgroupId().toString();
+        //创意id
+        costHourDM.adcreativeId = "";
+        //账号id
+        costHourDM.accountId = adStatOfMinuteDWD.getAccountId().toString();
+        //总消耗
+        costHourDM.costTotal = adStatOfMinuteDWD.getCostTotal();
+        //当天消耗
+        costHourDM.costDay = adStatOfMinuteDWD.getCostDay();
+        //当天小时消耗
+        costHourDM.costHour = adStatOfMinuteDWD.getCostHour();
+        //消耗速度
+        costHourDM.costSpeed = this.minutenow == 0 ? 0 : adStatOfMinuteDWD.getCostHour() / this.minutenow;
+        //总浏览量
+        costHourDM.viewCountTotal = adStatOfMinuteDWD.getViewCountTotal();
+        //天-总浏览量
+        costHourDM.viewCountDay = adStatOfMinuteDWD.getViewCountDay();
+        //小时-总浏览量
+        costHourDM.viewCountHour = adStatOfMinuteDWD.getViewCountHour();
+        //总平均千次曝光成本
+        costHourDM.thousandDisplayPriceAll = adStatOfMinuteDWD.getThousandDisplayPriceAll();
+        //天-总平均曝光成本
+        costHourDM.thousandDisplayPriceDay = adStatOfMinuteDWD.getThousandDisplayPriceDay();
+        //小时-总平均曝光成本
+        costHourDM.thousandDisplayPriceHour = adStatOfMinuteDWD.getThousandDisplayPriceHour();
+        //总点击量
+        costHourDM.validClickCountTotal = adStatOfMinuteDWD.getValidClickCountTotal();
+        //天-总点击量
+        costHourDM.validClickCountDay = adStatOfMinuteDWD.getValidClickCountDay();
+        //小时-总点击量
+        costHourDM.validClickCountHour = adStatOfMinuteDWD.getValidClickCountHour();
+        //总平均点击率
+        costHourDM.ctrAll = adStatOfMinuteDWD.getCtrAll();
+        //天-总平均点击率
+        costHourDM.ctrDay = adStatOfMinuteDWD.getCtrDay();
+        //小时-总平均点击率
+        costHourDM.ctrHour = adStatOfMinuteDWD.getCtrHour();
+        //总点击均价
+        costHourDM.cpcAll = adStatOfMinuteDWD.getCpcAll();
+        //天-总点击均价
+        costHourDM.cpcDay = adStatOfMinuteDWD.getCpcDay();
+        //小时-总点击均价
+        costHourDM.cpcHour = adStatOfMinuteDWD.getCpcHour();
+        //总目标转化量
+        costHourDM.conversionsCountTotal = adStatOfMinuteDWD.getConversionsCountTotal();
+        //天-总目标转化量
+        costHourDM.conversionsCountDay = adStatOfMinuteDWD.getConversionsCountDay();
+        //小时-总目标转化量
+        costHourDM.conversionsCountHour = adStatOfMinuteDWD.getConversionsCountHour();
+        //总目标平均转化成本
+        costHourDM.conversionsCostTotal = adStatOfMinuteDWD.getConversionsCostAll();
+        //天-总目标平均转化成本
+        costHourDM.conversionsCostDay = adStatOfMinuteDWD.getConversionsCostDay();
+        //小时-总目标平均转化成本
+        costHourDM.conversionsCostHour = adStatOfMinuteDWD.getConversionsCostHour();
+        //总平均转化率
+        costHourDM.conversionsRateAll = adStatOfMinuteDWD.getConversionsRateAll();
+        //天-总平均转化率
+        costHourDM.conversionsRateDay = adStatOfMinuteDWD.getConversionsRateDay();
+        //小时-总平均转化率
+        costHourDM.conversionsRateHour = adStatOfMinuteDWD.getConversionsRateHour();
+        //TODO:总首日下单roi
+        costHourDM.firstDayOrderRoiTotal = 0;
+        //天-总首日下单roi
+        costHourDM.firstDayOrderRoiDay = 0;
+        //小时-总首日下单roi
+        costHourDM.firstDayOrderRoiHour = 0;
+        //总首日下单金额
+        costHourDM.firstDayOrderAmountTotal = adStatOfMinuteDWD.getFirstDayOrderAmountTotal();
+        //天-总首日下单金额
+        costHourDM.firstDayOrderAmountDay = adStatOfMinuteDWD.getFirstDayOrderAmountDay();
+        //小时-总首日下单金额
+        costHourDM.firstDayOrderAmountHour = adStatOfMinuteDWD.getFirstDayOrderAmountHour();
+        //总首日下单量
+        costHourDM.firstDayOrderCountTotal = adStatOfMinuteDWD.getFirstDayOrderCountTotal();
+        //天-总首日下单量
+        costHourDM.firstDayOrderCountDay = adStatOfMinuteDWD.getFirstDayOrderCountDay();
+        //小时-总首日下单量
+        costHourDM.firstDayOrderCountHour = adStatOfMinuteDWD.getFirstDayOrderCountHour();
+        //总下单金额
+        costHourDM.webOrderAmountTotal = adStatOfMinuteDWD.getOrderAmountTotal();
+        //天-总下单金额
+        costHourDM.webOrderAmountDay = adStatOfMinuteDWD.getOrderAmountDay();
+        //小时-总下单金额
+        costHourDM.webOrderAmountHour = adStatOfMinuteDWD.getOrderAmountHour();
+        //总平均下单成本
+        costHourDM.webOrderCostTotal = adStatOfMinuteDWD.getWebOrderCostAll();
+        //天-总平均下单成本
+        costHourDM.webOrderCostDay = adStatOfMinuteDWD.getWebOrderCostDay();
+        //小时-总平均下单成本
+        costHourDM.webOrderCostHour = adStatOfMinuteDWD.getWebOrderCostHour();
+        //总平均下单率
+        costHourDM.webOrderRateTotal = adStatOfMinuteDWD.getOrderRateAll();
+        //天-总平均下单率
+        costHourDM.webOrderRateDay = adStatOfMinuteDWD.getOrderRateDay();
+        //小时-总平均下单率
+        costHourDM.webOrderRateHour = adStatOfMinuteDWD.getOrderRateHour();
+        //TODO:总平均下单量-----webordercount和ordercount是同一个东西吗
+        costHourDM.webOrderCountTotal = adStatOfMinuteDWD.getOrderCountTotal();
+        //天-总平均下单量
+        costHourDM.webOrderCountDay = adStatOfMinuteDWD.getOrderCountDay();
+        //小时-总平均下单量
+        costHourDM.webOrderCountHour = adStatOfMinuteDWD.getOrderCountHour();
+        //总下单ROI
+        costHourDM.orderRoiTotal = adStatOfMinuteDWD.getOrderRoiAll();
+        //天-总下单roi
+        costHourDM.orderRoiDay = adStatOfMinuteDWD.getOrderRoiDay();
+        //小时-总下单roi
+        costHourDM.orderRoiHour = adStatOfMinuteDWD.getOrderRoiHour();
+        //总平均下单客单价
+        costHourDM.orderUnitPriceTotal = adStatOfMinuteDWD.getOrderUnitPriceAll();
+        //天-总平均下单客单价
+        costHourDM.orderUnitPriceDay = adStatOfMinuteDWD.getOrderUnitPriceDay();
+        //小时-总平均下单客单价
+        costHourDM.orderUnitPriceHour = adStatOfMinuteDWD.getOrderUnitPriceHour();
+        //总公众号关注量
+        costHourDM.fromFollowUvTotal = adStatOfMinuteDWD.getOfficialAccountFollowCountTotal();
+        //天-总公众号关注量
+        costHourDM.fromFollowUvDay = adStatOfMinuteDWD.getOfficialAccountFollowCountDay();
+        //小时-总公众号关注量
+        costHourDM.fromFollowUvHour = adStatOfMinuteDWD.getOfficialAccountFollowCountHour();
+        //TODO:总平均公众号关注成本---是否是价格/关注
+        costHourDM.fromFollowCostTotal = adStatOfMinuteDWD.getOfficialAccountFollowCountTotal() == 0 ? 0 : adStatOfMinuteDWD.getCostTotal() / adStatOfMinuteDWD.getOfficialAccountFollowCountTotal();
+        //天-总平均公众号关注成本
+        costHourDM.fromFollowCostDay = adStatOfMinuteDWD.getOfficialAccountFollowCountDay() == 0 ? 0 : adStatOfMinuteDWD.getCostDay() / adStatOfMinuteDWD.getOfficialAccountFollowCountDay();
+        //小时-总平均公众号关注成本
+        costHourDM.fromFollowCostHour = adStatOfMinuteDWD.getOfficialAccountFollowCountHour() == 0 ? 0 : adStatOfMinuteDWD.getCostHour() / adStatOfMinuteDWD.getOfficialAccountFollowCountHour();
+        //TODO:总平均公众号关注率----确认是否对应
+        costHourDM.fromFollowRateTotal = adStatOfMinuteDWD.getOfficialAccountFollowRateAll();
+        //天-总平均公众号关注率
+        costHourDM.fromFollowRateDay = adStatOfMinuteDWD.getOfficialAccountFollowRateDay();
+        //小时-总平均公众号关注率
+        costHourDM.fromFollowRateHour = adStatOfMinuteDWD.getOfficialAccountFollowRateHour();
+        //TODO:总注册数-----下面全是有问题的
+        costHourDM.webRegisterCountTotal = adStatOfMinuteDWD.getOfficialAccountRegisterUserCountTotal();
+        //天-总注册数
+        costHourDM.webRegisterCountDay = adStatOfMinuteDWD.getOfficialAccountRegisterUserCountDay();
+        //小时-总注册数
+        costHourDM.webRegisterCountHour = adStatOfMinuteDWD.getOfficialAccountRegisterUserCountHour();
+        //总注册人数
+        costHourDM.webRegisterUvTotal = adStatOfMinuteDWD.getOfficialAccountRegisterUserCountTotal();
+        //天-总注册人数
+        costHourDM.webRegisterUvDay = adStatOfMinuteDWD.getOfficialAccountRegisterUserCountDay();
+        //小时-总注册人数
+        costHourDM.webRegisterUvHour = adStatOfMinuteDWD.getOfficialAccountRegisterUserCountHour();
+        //总平均注册成本
+        costHourDM.webRegisterCostTotal = adStatOfMinuteDWD.getOfficialAccountRegisterCostAll();
+        //天-总平均注册成本
+        costHourDM.webRegisterCostDay = adStatOfMinuteDWD.getOfficialAccountRegisterCostDay();
+        //小时-总平均注册成本
+        costHourDM.webRegisterCostHour = adStatOfMinuteDWD.getOfficialAccountRegisterCostHour();
+        return costHourDM;
+    }
+
+
+    @Override
+    public void process(Long elementCount, ProcessWindowFunction<AdStatOfMinuteDWD, CostHourDM, Long, TimeWindow>.Context context,
+                        Iterable<AdStatOfMinuteDWD> iterable, Collector<CostHourDM> collector) throws Exception {
+        long beginTime = context.window().getStart();
+        LocalDateTime beginDateTime = DateUtil.milliToLocalDateTime(beginTime);
+        LocalDate beginDate = beginDateTime.toLocalDate();
+        //当前天
+        String statDay = DateUtil.formatLocalDate(beginDate);
+        System.out.println(statDay);
+        //当前小时
+        int hourInt = beginDateTime.getHour();
+        this.minutenow = beginDateTime.getMinute();
+        String hour = beginDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:00:00"));
+        String lastHour = beginDateTime.minusHours(1).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:00:00"));
+        String lastTwoHour = beginDateTime.minusHours(2).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:00:00"));
+        String lastThreeHour = beginDateTime.minusHours(3).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:00:00"));
+
+        long now = System.currentTimeMillis();
+
+
+        //获取前几分钟
+
+
+        List<AdStatOfMinuteDWD> adStatOfMinuteDWDlist = new ArrayList<>(24);
+
+        for (AdStatOfMinuteDWD adStatOfMinuteDWD : iterable) {
+            adStatOfMinuteDWDlist.add(adStatOfMinuteDWD);
+            CostHourDM costHourDM = new CostHourDM();
+            if (adStatOfMinuteDWD.getHour() != hourInt) {
+                continue;
+            }
+
+            String adId = adStatOfMinuteDWD.getAdId().toString();
+            String sql = "select " +
+                    "if(hour='" + lastHour + "',cost_hour,0) last_hour_cost, " +
+                    "if(hour='" + lastTwoHour + "',cost_hour,0) last_two_hour_cost, " +
+                    "if(hour='" + lastHour + "',cost_hour,0) - if(hour='" + lastTwoHour + "',cost_hour,0) cost_last_hour_diff, " +
+                    "(if(hour='" + lastHour + "',cost_hour,0) - if(hour='" + lastTwoHour + "',cost_hour,0))*(if(hour='" + lastTwoHour + "',cost_hour,0) - if(hour='" + lastThreeHour + "',cost_hour,0)) cost_last_three_trend " +
+                    "from data_monitoring.cost_hour ch " +
+                    "where dt='" + statDay + "' and ad_id='" + adId + "' ";
+
+            System.out.println(sql);
+            Statement statement = connection.createStatement();
+            ResultSet rs = statement.executeQuery(sql);
+            while (rs.next()) {
+                costHourDM.costLastHour = rs.getLong(1);
+                costHourDM.costLastTwoHour = rs.getLong(2);
+                costHourDM.costLastHourDiff = rs.getLong(3);
+                costHourDM.costLastThreeTrend = rs.getLong(4);
+            }
+
+            CostHourDM costHourDM_new = datachange(adStatOfMinuteDWD, costHourDM);
+
+            collector.collect(costHourDM_new);
+            System.out.println("costhour_输出:" + JsonUtil.toString(costHourDM_new));
+        }
+        System.out.println("costhour_windowCount:" + adStatOfMinuteDWDlist.size());
+
+
+    }
+
+    @Override
+    public void clear(ProcessWindowFunction<AdStatOfMinuteDWD, CostHourDM, Long, TimeWindow>.Context context) throws Exception {
+        System.out.println("窗口关闭");
+    }
+
+}

+ 191 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostMinuteProcess.java

@@ -0,0 +1,191 @@
+package flink.zanxiangnet.ad.monitoring.process;
+
+import com.aliyun.odps.Instance;
+import com.aliyun.odps.Odps;
+import com.aliyun.odps.account.Account;
+import com.aliyun.odps.account.AliyunAccount;
+import com.aliyun.odps.data.Record;
+import com.aliyun.odps.task.SQLTask;
+import flink.zanxiangnet.ad.monitoring.clickhouse.sink.ClickhouseUtil;
+import flink.zanxiangnet.ad.monitoring.maxcompute.MaxComputeLog;
+import flink.zanxiangnet.ad.monitoring.pojo.entity.*;
+import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
+import flink.zanxiangnet.ad.monitoring.util.DateUtil;
+import flink.zanxiangnet.ad.monitoring.util.JsonUtil;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.SimpleDateFormat;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.*;
+import java.util.stream.Collectors;
+
+public class CostMinuteProcess extends ProcessWindowFunction<AdStatOfMinuteDWD, CostMinuterDM, Long, TimeWindow> {
+    private static final DateTimeFormatter formatForLastReduceKey = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH:mm");
+    private Connection connection = null;
+    private int minutenow = 1;
+
+    @Override
+    public void open(Configuration conf) throws SQLException, ClassNotFoundException {
+        ClickhouseUtil clickhouseUtil = new ClickhouseUtil();
+        connection = ClickhouseUtil.getConn("cc-bp11803zbt0oq045io.ads.rds.aliyuncs.com",
+                "8123", "data_monitoring");
+    }
+
+    //数据格式转换
+    public CostMinuterDM datachange(AdStatOfMinuteDWD adStatOfMinuteDWD, CostMinuterDM costMinuterDM) {
+
+        //时间-天
+        costMinuterDM.dt = adStatOfMinuteDWD.getStatDay();
+        //计划 id
+        costMinuterDM.campaignId = adStatOfMinuteDWD.getCampaignId().toString();
+        //时间- real
+        costMinuterDM.createTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(adStatOfMinuteDWD.getCreateTime());
+
+        //时间-小时
+        String tmpHour = new SimpleDateFormat("yyyy-MM-dd HH:00:00").format(new Date(adStatOfMinuteDWD.getStatTime()));
+        costMinuterDM.hour = tmpHour;
+        //时间-分钟
+        String tmpMinute = new SimpleDateFormat("yyyy-MM-dd HH:mm:00").format(new Date(adStatOfMinuteDWD.getStatTime()));
+        costMinuterDM.minute = tmpMinute;
+        //广告id
+        costMinuterDM.adId = adStatOfMinuteDWD.getAdId().toString();
+        //广告组id
+        costMinuterDM.adgroupId = adStatOfMinuteDWD.getAdgroupId().toString();
+        //创意id
+        costMinuterDM.adcreativeId = "";
+        //账号id
+        costMinuterDM.accountId = adStatOfMinuteDWD.getAccountId().toString();
+        //当天分钟消耗
+        costMinuterDM.costMinute = adStatOfMinuteDWD.getCostMinute();
+        //消耗速度-----分钟最小为1,部分分钟的速度会慢
+        costMinuterDM.costSpeed = adStatOfMinuteDWD.getCostMinute() / this.minutenow;
+        //分钟-总浏览量
+        costMinuterDM.viewCountMinute = adStatOfMinuteDWD.getViewCountMinute();
+        //分钟-总平均曝光成本
+        costMinuterDM.thousandDisplayPriceMinute = adStatOfMinuteDWD.getThousandDisplayPriceMinute();
+        //分钟-总点击量
+        costMinuterDM.validClickCountMinute = adStatOfMinuteDWD.getValidClickCountMinute();
+        //分钟-总平均点击率
+        costMinuterDM.ctrMinute = adStatOfMinuteDWD.getCtrMinute();
+        //分钟-总点击均价
+        costMinuterDM.cpcMinute = adStatOfMinuteDWD.getCpcMinute();
+        //分钟-总目标转化量
+        costMinuterDM.conversionsCountMinute = adStatOfMinuteDWD.getConversionsCountMinute();
+        //分钟-总目标平均转化成本
+        costMinuterDM.conversionsCostMinute = adStatOfMinuteDWD.getConversionsCostMinute();
+        //分钟-总平均转化率
+        costMinuterDM.conversionsRateMinute = adStatOfMinuteDWD.getConversionsRateMinute();
+        //TODO:分钟-总首日下单roi
+        costMinuterDM.firstDayOrderRoiMinute = 0;
+        //分钟-总首日下单金额
+        costMinuterDM.firstDayOrderAmountMinute = adStatOfMinuteDWD.getFirstDayOrderAmountMinute();
+        //分钟-总首日下单量
+        costMinuterDM.firstDayOrderCountMinute = adStatOfMinuteDWD.getFirstDayOrderCountMinute();
+        //分钟-总下单金额
+        costMinuterDM.webOrderAmountMinute = adStatOfMinuteDWD.getOrderAmountMinute();
+        //分钟-总平均下单成本
+        costMinuterDM.webOrderCostMinute = adStatOfMinuteDWD.getWebOrderCostMinute();
+        //分钟-总平均下单率
+        costMinuterDM.webOrderRateMinute = adStatOfMinuteDWD.getOrderRateMinute();
+        //TODO:分钟-总平均下单量
+        costMinuterDM.webOrderCountMinute = adStatOfMinuteDWD.getOrderCountMinute();
+        //分钟-总下单roi
+        costMinuterDM.orderRoiMinute = adStatOfMinuteDWD.getOrderRoiMinute();
+        //分钟-总平均下单客单价
+        costMinuterDM.orderUnitPriceMinute = adStatOfMinuteDWD.getOrderUnitPriceMinute();
+        //分钟-总公众号关注量
+        costMinuterDM.fromFollowUvMinute = adStatOfMinuteDWD.getOfficialAccountFollowCountMinute();
+        //TODO:分钟-总平均公众号关注成本
+        costMinuterDM.fromFollowCostMinute = adStatOfMinuteDWD.getOfficialAccountFollowCountMinute() == 0 ? 0 : adStatOfMinuteDWD.getCostMinute() / adStatOfMinuteDWD.getOfficialAccountFollowCountMinute();
+        //TODO:分钟-总平均公众号关注率
+        costMinuterDM.fromFollowRateMinute = adStatOfMinuteDWD.getOfficialAccountFollowRateMinute();
+        //TODO:分钟-总注册数-----下面都有问题
+        costMinuterDM.webRegisterCountMinute = adStatOfMinuteDWD.getOfficialAccountRegisterUserCountMinute();
+        //分钟-总注册人数
+        costMinuterDM.webRegisterUvMinute = adStatOfMinuteDWD.getOfficialAccountRegisterUserCountMinute();
+        //分钟-总平均注册成本
+        costMinuterDM.webRegisterCostMinute = adStatOfMinuteDWD.getOfficialAccountRegisterCostMinute();
+        return costMinuterDM;
+    }
+
+
+    @Override
+    public void process(Long elementCount, ProcessWindowFunction<AdStatOfMinuteDWD, CostMinuterDM, Long, TimeWindow>.Context context,
+                        Iterable<AdStatOfMinuteDWD> iterable, Collector<CostMinuterDM> collector) throws Exception {
+        long beginTime = context.window().getStart();
+        LocalDateTime beginDateTime = DateUtil.milliToLocalDateTime(beginTime);
+        LocalDate beginDate = beginDateTime.toLocalDate();
+        //当前天
+        String statDay = DateUtil.formatLocalDate(beginDate);
+        System.out.println(statDay);
+        //当前小时
+        int hourInt = beginDateTime.getHour();
+        String hour = beginDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:00:00"));
+        String lastHour = beginDateTime.minusHours(1).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:00:00"));
+        String lastTwoHour = beginDateTime.minusHours(2).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:00:00"));
+        String lastThreeHour = beginDateTime.minusHours(3).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:00:00"));
+
+        this.minutenow = (beginDateTime.getMinute() % 5) == 0 ? 1 : (beginDateTime.getMinute() % 5);
+        long now = System.currentTimeMillis();
+
+        //获取前几分钟
+        List<AdStatOfMinuteDWD> adStatOfMinuteDWDlist = new ArrayList<>(24);
+
+        for (AdStatOfMinuteDWD adStatOfMinuteDWD : iterable) {
+            adStatOfMinuteDWDlist.add(adStatOfMinuteDWD);
+            CostMinuterDM costMinuterDM = new CostMinuterDM();
+
+            if (adStatOfMinuteDWD.getHour() != hourInt) {
+                continue;
+            }
+
+            String adId = adStatOfMinuteDWD.getAdId().toString();
+            String sql = "select " +
+                    "if(hour='" + lastHour + "',cost_hour,0) last_hour_cost, " +
+                    "if(hour='" + lastTwoHour + "',cost_hour,0) last_two_hour_cost, " +
+                    "if(hour='" + lastHour + "',cost_hour,0) - if(hour='" + lastTwoHour + "',cost_hour,0) cost_last_hour_diff, " +
+                    "(if(hour='" + lastHour + "',cost_hour,0) - if(hour='" + lastTwoHour + "',cost_hour,0))*(if(hour='" + lastTwoHour + "',cost_hour,0) - if(hour='" + lastThreeHour + "',cost_hour,0)) cost_last_three_trend " +
+                    "from data_monitoring.cost_hour ch " +
+                    "where dt='" + statDay + "' and ad_id='" + adId + "' ";
+
+            System.out.println(sql);
+            Statement statement = connection.createStatement();
+            ResultSet rs = statement.executeQuery(sql);
+            while (rs.next()) {
+                costMinuterDM.costLastHour = rs.getLong(1);
+                costMinuterDM.costLastTwoHour = rs.getLong(2);
+                costMinuterDM.costLastHourDiff = rs.getLong(3);
+                costMinuterDM.costLastThreeTrend = rs.getLong(4);
+
+            }
+
+
+            CostMinuterDM CostMinuterDM_new = datachange(adStatOfMinuteDWD, costMinuterDM);
+
+            collector.collect(CostMinuterDM_new);
+            System.out.println("costminute_输出:" + JsonUtil.toString(CostMinuterDM_new));
+        }
+        System.out.println("costminute_windowCount:" + adStatOfMinuteDWDlist.size());
+
+
+    }
+
+    @Override
+    public void clear(ProcessWindowFunction<AdStatOfMinuteDWD, CostMinuterDM, Long, TimeWindow>.Context context) throws Exception {
+        System.out.println("窗口关闭");
+    }
+
+}

+ 144 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/sink/ClickhouseBatchStreamSink.java

@@ -0,0 +1,144 @@
+package flink.zanxiangnet.ad.monitoring.sink;
+
+import com.aliyun.odps.Odps;
+import com.aliyun.odps.PartitionSpec;
+import com.aliyun.odps.account.Account;
+import com.aliyun.odps.account.AliyunAccount;
+import com.aliyun.odps.data.Record;
+import com.aliyun.odps.tunnel.TableTunnel;
+import com.aliyun.odps.tunnel.TunnelException;
+import flink.zanxiangnet.ad.monitoring.maxcompute.MaxComputeLog;
+import flink.zanxiangnet.ad.monitoring.maxcompute.bean.BeanUtil;
+import flink.zanxiangnet.ad.monitoring.maxcompute.bean.annotation.MaxComputeTable;
+import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.CollectionUtils;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * 批量数据写出
+ *
+ * @param <IN>
+ */
+public class ClickhouseBatchStreamSink<T, IN extends List<T>> extends RichSinkFunction<IN> {
+    private static final Logger log = LoggerFactory.getLogger(ClickhouseBatchStreamSink.class);
+
+    // 对象锁,防止MaxCompute的 Tunnel对象多次初始化
+    private static final Object DUMMY_LOCK = new Object();
+
+    private final Class<T> clazz;
+    private String projectName;
+    private String tableName;
+
+    private volatile transient TableTunnel tunnel;
+    private volatile transient List<BeanUtil.FieldInfo> fieldInfoList;
+    private volatile transient Map<String, Method> partitionFieldMethods;
+
+    public ClickhouseBatchStreamSink(Class<T> clazz) {
+        this.clazz = clazz;
+    }
+
+    @Override
+    public void open(Configuration config) {
+        if (tunnel == null) {
+            synchronized (DUMMY_LOCK) {
+                if (tunnel == null) {
+                    Map<String, String> params = getRuntimeContext()
+                            .getExecutionConfig()
+                            .getGlobalJobParameters()
+                            .toMap();
+                    MaxComputeTable tableAnnotation = clazz.getAnnotation(MaxComputeTable.class);
+
+                    Account account = new AliyunAccount(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_ID),
+                            params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_KEY));
+                    Odps odps = new Odps(account);
+                    odps.getRestClient().setRetryLogger(new MaxComputeLog());
+                    odps.setEndpoint(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_ENDPOINT));
+                    odps.setDefaultProject(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_PROJECT_NAME));
+                    tunnel = new TableTunnel(odps);
+                    tunnel.setEndpoint(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_TUNNEL_ENDPOINT));
+                    projectName = params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_PROJECT_NAME);
+                    tableName = tableAnnotation.value();
+                    fieldInfoList = BeanUtil.parseBeanField(clazz);
+                    partitionFieldMethods = fieldInfoList.stream().filter(BeanUtil.FieldInfo::isUsePartitioned).collect(Collectors.toMap(BeanUtil.FieldInfo::getColumnName, BeanUtil.FieldInfo::getGetMethod));
+                }
+            }
+        }
+    }
+
+    /**
+     * 将值写入到 Sink。每个值都会调用此函数
+     *
+     * @param value
+     * @param context
+     */
+    @Override
+    public void invoke(IN value, Context context) throws TunnelException, IOException, InvocationTargetException, IllegalAccessException {
+        T element = value.get(0);
+        String partitionStr = generatePartitionStr(element);
+        System.out.println("[" + tableName + "]写入数据量:" + value.size() + "写入分区:" + partitionStr);
+        TableTunnel.StreamUploadSession uploadSession = tunnel.createStreamUploadSession(projectName, tableName, StringUtils.isBlank(partitionStr) ? null : new PartitionSpec(partitionStr), true);
+        TableTunnel.StreamRecordPack pack = uploadSession.newRecordPack();
+        for (T t : value) {
+            Record record = uploadSession.newRecord();
+            for (BeanUtil.FieldInfo fieldInfo : fieldInfoList) {
+                if (fieldInfo.isUsePartitioned()) {
+                    // 分区字段不在这里设值
+                    continue;
+                }
+                Object obj = fieldInfo.getGetMethod().invoke(t);
+                record.set(fieldInfo.getColumnName(), obj);
+            }
+            // append只是写入内存
+            pack.append(record);
+        }
+        int retry = 0;
+        do {
+            try {
+                // 大概用时 100ms ~ 3s
+                pack.flush();
+                break;
+            } catch (IOException e) {
+                if (retry == 3) {
+                    log.error("Flush data error!msg: " + e.getMessage());
+                    throw e;
+                }
+            }
+        } while (retry++ < 3);
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+    }
+
+    private String generatePartitionStr(T t) {
+        if (CollectionUtils.isEmpty(partitionFieldMethods)) {
+            return null;
+        }
+        StringBuilder partition = new StringBuilder();
+        for (Map.Entry<String, Method> entry : partitionFieldMethods.entrySet()) {
+            partition.append(entry.getKey()).append("=");
+            try {
+                partition.append(entry.getValue().invoke(t));
+            } catch (InvocationTargetException | IllegalAccessException e) {
+                // 获取分区字段的值失败
+                log.error(e.getMessage(), e);
+                throw new RuntimeException("Failed get partition field value!");
+            }
+            partition.append(",");
+        }
+        partition = new StringBuilder(partition.substring(0, partition.length() - 1));
+        return partition.toString();
+    }
+}

+ 39 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/trigger/CostMinuteDMStreamTrigger.java

@@ -0,0 +1,39 @@
+package flink.zanxiangnet.ad.monitoring.trigger;
+
+import flink.zanxiangnet.ad.monitoring.pojo.entity.AdDataOfMinuteODS;
+import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfMinuteDWD;
+import flink.zanxiangnet.ad.monitoring.util.DateUtil;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+public class CostMinuteDMStreamTrigger extends Trigger<AdStatOfMinuteDWD, TimeWindow> {
+    @Override
+    public TriggerResult onElement(AdStatOfMinuteDWD adStatOfMinuteDWD, long time, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
+        // FIXME 水印时间没有意义!拿到数据是 Long.MAX
+        if (timeWindow.maxTimestamp() <= triggerContext.getCurrentWatermark()) {
+            // 到了窗口的最大生命周期
+            return TriggerResult.FIRE_AND_PURGE;
+        }
+        triggerContext.registerEventTimeTimer(timeWindow.maxTimestamp());
+        if (adStatOfMinuteDWD.getHour() == DateUtil.milliToLocalDateTime(time).getHour()) {
+            return TriggerResult.FIRE;
+        }
+        return TriggerResult.CONTINUE;
+    }
+
+    @Override
+    public TriggerResult onProcessingTime(long time, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
+        return TriggerResult.CONTINUE;
+    }
+
+    @Override
+    public TriggerResult onEventTime(long time, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
+        return time == timeWindow.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
+    }
+
+    @Override
+    public void clear(TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
+        triggerContext.deleteEventTimeTimer(timeWindow.maxTimestamp());
+    }
+}