ソースを参照

MOD:小时流获取修改

cxyu 3 年 前
コミット
90acdfaf4e

+ 97 - 87
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/AdStatJob.java

@@ -16,10 +16,7 @@ import flink.zanxiangnet.ad.monitoring.pojo.dto.AdStatOfDayODSDTO;
 import flink.zanxiangnet.ad.monitoring.pojo.entity.*;
 import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
 import flink.zanxiangnet.ad.monitoring.pojo.dto.AdDataOfMinuteDTO;
-import flink.zanxiangnet.ad.monitoring.process.AdMinuteDWDProcess;
-import flink.zanxiangnet.ad.monitoring.process.CostHourDayProcess;
-import flink.zanxiangnet.ad.monitoring.process.CostHourProcess;
-import flink.zanxiangnet.ad.monitoring.process.CostMinuteProcess;
+import flink.zanxiangnet.ad.monitoring.process.*;
 import flink.zanxiangnet.ad.monitoring.sink.TunnelBatchStreamSink;
 import flink.zanxiangnet.ad.monitoring.stream.KeyedBatchStream;
 import flink.zanxiangnet.ad.monitoring.trigger.AdMinuteODSStreamTrigger;
@@ -164,6 +161,19 @@ public class AdStatJob {
         clickhouseMinuteDmStream.addSink(batchSinkMinute);
 
 
+        //cost----小时数据处理
+        SingleOutputStreamOperator<CostHourDM> clickhouseMinuteHourDmStream =
+                adMinuteDWDStream
+                        .keyBy(AdStatOfMinuteDWD::getAdId)
+                        .window(TumblingEventTimeWindows.of(Time.minutes(5L)))
+                        .trigger(new CostMinuteDMStreamTrigger())
+                        .process(new CostMinuteHourProcess())
+                        .name("sink_ad_minute_hour_dm_clickhouse");
+
+        BatchSinkHour batchSinkMinuteHour = new BatchSinkHour();
+        clickhouseMinuteHourDmStream.addSink(batchSinkMinuteHour);
+
+
         // 小时流(直接写到小时报表的 ods)
         DataStream<AdDataOfHourODS> adHourODSStream = adODSStream.getSideOutput(adHourStreamTag);
         // 写入原始表
@@ -175,69 +185,69 @@ public class AdStatJob {
 
         SingleOutputStreamOperator<AdStatOfHourDWD> adHourDWDStream =
                 adHourODSStream.keyBy(AdDataOfHourODS::getAdId)
-                .countWindow(1).
-                process(new ProcessWindowFunction<AdDataOfHourODS, AdStatOfHourDWD, Long, GlobalWindow>() {
-                    private Odps odps;
-                    // 上次查询的天数据
-                    private ValueState<String> lastQueryDayState;
-                    // 聚合的天的数据
-                    private MapState<String, AdStatOfHourDWD> historyReduceState;
-
-                    @Override
-                    public void open(Configuration conf) {
-                        Map<String, String> params = getRuntimeContext()
-                                .getExecutionConfig()
-                                .getGlobalJobParameters()
-                                .toMap();
-                        Account account = new AliyunAccount(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_ID),
-                                params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_KEY));
-                        odps = new Odps(account);
-                        odps.getRestClient().setRetryLogger(new MaxComputeLog());
-                        odps.setEndpoint(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_ENDPOINT));
-                        odps.setDefaultProject(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_PROJECT_NAME));
-
-                        lastQueryDayState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastQueryDayState", String.class));
-                        historyReduceState = getRuntimeContext().getMapState(new MapStateDescriptor<>("historyReduceState", Types.STRING, Types.POJO(AdStatOfHourDWD.class)));
-                    }
-
-                    @Override
-                    public void process(Long elementsCount, ProcessWindowFunction<AdDataOfHourODS, AdStatOfHourDWD, Long, GlobalWindow>.Context context, Iterable<AdDataOfHourODS> iterable, Collector<AdStatOfHourDWD> collector) throws Exception {
-                        AdDataOfHourODS element = iterable.iterator().next();
-                        LocalDate statDay = DateUtil.parseLocalDate(element.getStatDay());
-                        LocalDateTime statTime = LocalDateTime.of(statDay, LocalTime.of(element.getHour(), 0, 0));
-                        long now = System.currentTimeMillis();
-                        LocalDate today = LocalDate.now();
-
-                        String lastQueryDay = lastQueryDayState.value();
-                        // 从 maxCompute拉取指定 广告的历史数据
-                        if (lastQueryDay == null || !lastQueryDay.equals(DateUtil.formatLocalDate(today))) {
-                            LocalDate endDay = today, beginDay = statDay.minusDays(60);
-                            String sql = "SELECT * FROM ad_stat_of_hour_dwd WHERE stat_day >= \"" + DateUtil.formatLocalDate(beginDay) + "\" AND stat_day <= \"" + DateUtil.formatLocalDate(endDay) + "\" AND ad_id = " + element.getAdId() + ";";
-                            Instance instance = SQLTask.run(odps, sql);
-                            System.out.println("212===>sql: " + sql + ", odps日志: " + odps.logview().generateLogView(instance, 7 * 24));
-                            instance.waitForSuccess();
-                            List<Record> records = SQLTask.getResult(instance);
-                            Map<String, AdStatOfHourDWD> historyHourMap = records.stream()
-                                    .map(AdStatOfHourDWD::byMaxCompute)
-                                    .sorted((o1, o2) -> new Long(o1.getCreateTime().getTime() - o2.getCreateTime().getTime()).intValue())
-                                    .collect(Collectors.toMap(data -> data.getStatDay() + data.getHour(), data -> data, (val1, val2) -> val2));
-                            historyReduceState.clear();
-                            historyReduceState.putAll(historyHourMap);
-                            lastQueryDayState.update(DateUtil.formatLocalDate(today));
-                        }
-                        AdStatOfHourDWD lastReduceData = null;
-                        for (int i = 1; i < 60 * 24; i++) {
-                            LocalDateTime time = statTime.minusHours(i);
-                            lastReduceData = historyReduceState.get(DateUtil.formatLocalDate(time.toLocalDate()) + time.getHour());
-                            if (lastReduceData != null) {
-                                break;
+                        .countWindow(1).
+                        process(new ProcessWindowFunction<AdDataOfHourODS, AdStatOfHourDWD, Long, GlobalWindow>() {
+                            private Odps odps;
+                            // 上次查询的天数据
+                            private ValueState<String> lastQueryDayState;
+                            // 聚合的天的数据
+                            private MapState<String, AdStatOfHourDWD> historyReduceState;
+
+                            @Override
+                            public void open(Configuration conf) {
+                                Map<String, String> params = getRuntimeContext()
+                                        .getExecutionConfig()
+                                        .getGlobalJobParameters()
+                                        .toMap();
+                                Account account = new AliyunAccount(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_ID),
+                                        params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_KEY));
+                                odps = new Odps(account);
+                                odps.getRestClient().setRetryLogger(new MaxComputeLog());
+                                odps.setEndpoint(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_ENDPOINT));
+                                odps.setDefaultProject(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_PROJECT_NAME));
+
+                                lastQueryDayState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastQueryDayState", String.class));
+                                historyReduceState = getRuntimeContext().getMapState(new MapStateDescriptor<>("historyReduceState", Types.STRING, Types.POJO(AdStatOfHourDWD.class)));
                             }
-                        }
 
-                        AdStatOfHourDWD newStatData = AdStatOfHourDWD.reduce(lastReduceData, element, now);
-                        collector.collect(newStatData);
-                    }
-                });
+                            @Override
+                            public void process(Long elementsCount, ProcessWindowFunction<AdDataOfHourODS, AdStatOfHourDWD, Long, GlobalWindow>.Context context, Iterable<AdDataOfHourODS> iterable, Collector<AdStatOfHourDWD> collector) throws Exception {
+                                AdDataOfHourODS element = iterable.iterator().next();
+                                LocalDate statDay = DateUtil.parseLocalDate(element.getStatDay());
+                                LocalDateTime statTime = LocalDateTime.of(statDay, LocalTime.of(element.getHour(), 0, 0));
+                                long now = System.currentTimeMillis();
+                                LocalDate today = LocalDate.now();
+
+                                String lastQueryDay = lastQueryDayState.value();
+                                // 从 maxCompute拉取指定 广告的历史数据
+                                if (lastQueryDay == null || !lastQueryDay.equals(DateUtil.formatLocalDate(today))) {
+                                    LocalDate endDay = today, beginDay = statDay.minusDays(60);
+                                    String sql = "SELECT * FROM ad_stat_of_hour_dwd WHERE stat_day >= \"" + DateUtil.formatLocalDate(beginDay) + "\" AND stat_day <= \"" + DateUtil.formatLocalDate(endDay) + "\" AND ad_id = " + element.getAdId() + ";";
+                                    Instance instance = SQLTask.run(odps, sql);
+                                    System.out.println("212===>sql: " + sql + ", odps日志: " + odps.logview().generateLogView(instance, 7 * 24));
+                                    instance.waitForSuccess();
+                                    List<Record> records = SQLTask.getResult(instance);
+                                    Map<String, AdStatOfHourDWD> historyHourMap = records.stream()
+                                            .map(AdStatOfHourDWD::byMaxCompute)
+                                            .sorted((o1, o2) -> new Long(o1.getCreateTime().getTime() - o2.getCreateTime().getTime()).intValue())
+                                            .collect(Collectors.toMap(data -> data.getStatDay() + data.getHour(), data -> data, (val1, val2) -> val2));
+                                    historyReduceState.clear();
+                                    historyReduceState.putAll(historyHourMap);
+                                    lastQueryDayState.update(DateUtil.formatLocalDate(today));
+                                }
+                                AdStatOfHourDWD lastReduceData = null;
+                                for (int i = 1; i < 60 * 24; i++) {
+                                    LocalDateTime time = statTime.minusHours(i);
+                                    lastReduceData = historyReduceState.get(DateUtil.formatLocalDate(time.toLocalDate()) + time.getHour());
+                                    if (lastReduceData != null) {
+                                        break;
+                                    }
+                                }
+
+                                AdStatOfHourDWD newStatData = AdStatOfHourDWD.reduce(lastReduceData, element, now);
+                                collector.collect(newStatData);
+                            }
+                        });
         //.addSink(new TunnelBatchSink<>(AdStatOfHourDWD.class, 30000L, 365L, 6));
         new KeyedBatchStream<>("adHourDWDStream", adHourDWDStream.keyBy(AdStatOfHourDWD::getStatDay), 4000L, 60 * 1000L)
                 .toBatch()
@@ -380,17 +390,17 @@ public class AdStatJob {
                 .addSink(new TunnelBatchStreamSink<>(AdStatOfDayDWD.class))
                 .name("sink_ad_day_dwd");
 
-        //小时数据-----天
-        SingleOutputStreamOperator<CostHourDM> clickhouseHourDayDmStream =
-                adDayDWDStream
-                        .keyBy(AdStatOfDayDWD::getAdId)
-                        .countWindow(1)
-                        .process(new CostHourDayProcess())
-                        .name("sink_ad_hour_day_dm_clickhouse");
-
-//        clickhouseHourDayDmStream.print();
-        BatchSinkHour batchSinkHourDay = new BatchSinkHour();
-        clickhouseHourDayDmStream.addSink(batchSinkHourDay);
+//        //小时数据-----天
+//        SingleOutputStreamOperator<CostHourDM> clickhouseHourDayDmStream =
+//                adDayDWDStream
+//                        .keyBy(AdStatOfDayDWD::getAdId)
+//                        .countWindow(1)
+//                        .process(new CostHourDayProcess())
+//                        .name("sink_ad_hour_day_dm_clickhouse");
+//
+////        clickhouseHourDayDmStream.print();
+//        BatchSinkHour batchSinkHourDay = new BatchSinkHour();
+//        clickhouseHourDayDmStream.addSink(batchSinkHourDay);
 
 
         SingleOutputStreamOperator<AdStatOfDayDWD> adDayDWDYearStream = adDayODSStreamSplit.getSideOutput(adDayStreamRollYearTag)
@@ -417,17 +427,17 @@ public class AdStatJob {
                 .toBatch()
                 .addSink(new TunnelBatchStreamSink<>(AdStatOfDayDWD.class))
                 .name("sink_ad_year_dwd");
-        //小时数据----年
-        SingleOutputStreamOperator<CostHourDM> clickhouseHourYearDmStream =
-                adDayDWDYearStream
-                        .keyBy(AdStatOfDayDWD::getAdId)
-                        .countWindow(1)
-                        .process(new CostHourDayProcess())
-                        .name("sink_ad_hour_year_dm_clickhouse");
-
-//        clickhouseHourYearDmStream.print();
-        BatchSinkHour batchSinkHourYear = new BatchSinkHour();
-        clickhouseHourYearDmStream.addSink(batchSinkHourYear);
+//        //小时数据----年
+//        SingleOutputStreamOperator<CostHourDM> clickhouseHourYearDmStream =
+//                adDayDWDYearStream
+//                        .keyBy(AdStatOfDayDWD::getAdId)
+//                        .countWindow(1)
+//                        .process(new CostHourDayProcess())
+//                        .name("sink_ad_hour_year_dm_clickhouse");
+//
+////        clickhouseHourYearDmStream.print();
+//        BatchSinkHour batchSinkHourYear = new BatchSinkHour();
+//        clickhouseHourYearDmStream.addSink(batchSinkHourYear);
 
         //TODO:jobname----后面应该带个时间以此来区分log以及job
         env.execute("job_ad");

+ 4 - 4
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/clickhouse/sink/BatchSinkHour.java

@@ -230,10 +230,10 @@ public class BatchSinkHour extends RichSinkFunction<CostHourDM> {
         System.out.println("批量插入耗时: " + (endTime - startTime) + "插入数据数量=" + ints.length);
         //clickhouse 处理重复数据
         //TODO:数据去重有问题,去除掉非最新的数据
-        Statement statement_duplicate = connection.createStatement();
-        String sql_duplicate = "optimize table data_monitoring.cost_hour final;";
-        statement_duplicate.executeQuery(sql_duplicate);
-        connection.commit();
+//        Statement statement_duplicate = connection.createStatement();
+//        String sql_duplicate = "optimize table data_monitoring.cost_hour final;";
+//        statement_duplicate.executeQuery(sql_duplicate);
+//        connection.commit();
         long endTime_dp = System.currentTimeMillis();
         System.out.println("数据清理耗时: " + (endTime_dp - endTime));
 

+ 5 - 5
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/clickhouse/sink/BatchSinkMinute.java

@@ -82,7 +82,7 @@ public class BatchSinkMinute extends RichSinkFunction<CostMinuterDM> {
                 "?,?,?,?,?,?,?,?,?,?,\n" +
                 "?,?,?,?,?,?,?,?,?,?,\n" +
                 "?,?,?,?,?,?,?,?,?,?,\n)";
-        System.out.println(costMinuterDM);
+//        System.out.println(costMinuterDM);
         PreparedStatement preparedStatement = connection.prepareStatement(sql);
         preparedStatement.setString(1, costMinuterDM.dt);
         preparedStatement.setString(2, costMinuterDM.minute);
@@ -132,10 +132,10 @@ public class BatchSinkMinute extends RichSinkFunction<CostMinuterDM> {
         long endTime = System.currentTimeMillis();
         System.out.println("批量插入耗时: " + (endTime - startTime) + "插入数据数量=" + ints.length);
         //TODO:数据去重有问题,去除掉非最新的数据
-        Statement statement_duplicate = connection.createStatement();
-        String sql_duplicate = "optimize table data_monitoring.cost_minute final;";
-        statement_duplicate.executeQuery(sql_duplicate);
-        connection.commit();
+//        Statement statement_duplicate = connection.createStatement();
+//        String sql_duplicate = "optimize table data_monitoring.cost_minute final;";
+//        statement_duplicate.executeQuery(sql_duplicate);
+//        connection.commit();
         long endTime_dp = System.currentTimeMillis();
         System.out.println("数据清理耗时: " + (endTime_dp - endTime));
 

+ 4 - 4
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostHourDayProcess.java

@@ -49,7 +49,7 @@ public class CostHourDayProcess extends ProcessWindowFunction<AdStatOfDayDWD, Co
         costHourDM.createTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(adStatOfMinuteDWD.getCreateTime());
         //时间-小时
         //TODO:之后需要进一步修改
-        costHourDM.hour = adStatOfMinuteDWD.getStatDay() + " 24:00:00";
+        costHourDM.hour = adStatOfMinuteDWD.getStatDay() + " 23:00:00";
         //广告id
         costHourDM.adId = adStatOfMinuteDWD.getAdId().toString();
         //广告组id
@@ -259,7 +259,7 @@ public class CostHourDayProcess extends ProcessWindowFunction<AdStatOfDayDWD, Co
                     "and dt='" + adStatOfMinuteDWD.getStatDay() + "' and hour='" + tmpHour + "'";
 
 
-            System.out.println(sql);
+//            System.out.println(sql);
             Statement statement = connection.createStatement();
             ResultSet rs = statement.executeQuery(sql);
             while (rs.next()) {
@@ -296,9 +296,9 @@ public class CostHourDayProcess extends ProcessWindowFunction<AdStatOfDayDWD, Co
             CostHourDM costHourDM_new = datachange(adStatOfMinuteDWD, costHourDM);
 
             collector.collect(costHourDM_new);
-            System.out.println("costhour_输出:" + JsonUtil.toString(costHourDM_new));
+//            System.out.println("costhour_输出:" + JsonUtil.toString(costHourDM_new));
         }
-        System.out.println("costhour_windowCount:" + adStatOfMinuteDWDlist.size());
+//        System.out.println("costhour_windowCount:" + adStatOfMinuteDWDlist.size());
 
 
     }

+ 5 - 6
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostHourProcess.java

@@ -215,7 +215,6 @@ public class CostHourProcess extends ProcessWindowFunction<AdStatOfHourDWD, Cost
     public void process(Long elementCount, ProcessWindowFunction<AdStatOfHourDWD, CostHourDM, Long, GlobalWindow>.Context context,
                         Iterable<AdStatOfHourDWD> iterable, Collector<CostHourDM> collector) throws Exception {
 
-        System.out.println("进入get in ");
         //获取前几分钟
 
 
@@ -242,7 +241,7 @@ public class CostHourProcess extends ProcessWindowFunction<AdStatOfHourDWD, Cost
             tmpHourInt = adStatOfMinuteDWD.getHour() - 3;
             tmpHour = tmpHourInt > 9 ? String.valueOf(tmpHourInt) : "0" + tmpHourInt;
             String lastThreeHour = adStatOfMinuteDWD.getStatDay() + " " + tmpHour + ":00:00";
-
+            //TODO:优化为本地拿到小时数据
             String adId = adStatOfMinuteDWD.getAdId().toString();
             String sql = "select " +
                     "if(hour='" + lastHour + "',cost_hour,0) last_hour_cost, " +
@@ -252,7 +251,7 @@ public class CostHourProcess extends ProcessWindowFunction<AdStatOfHourDWD, Cost
                     "from data_monitoring.cost_hour ch " +
                     "where dt='" + statDay + "' and ad_id='" + adId + "' ";
 
-            System.out.println(sql);
+//            System.out.println(sql);
             Statement statement = connection.createStatement();
             ResultSet rs = statement.executeQuery(sql);
             while (rs.next()) {
@@ -265,10 +264,10 @@ public class CostHourProcess extends ProcessWindowFunction<AdStatOfHourDWD, Cost
             CostHourDM costHourDM_new = datachange(adStatOfMinuteDWD, costHourDM);
 
             collector.collect(costHourDM_new);
-            System.out.println("costhour_输入" + JsonUtil.toString(adStatOfMinuteDWD));
-            System.out.println("costhour_输出:" + JsonUtil.toString(costHourDM_new));
+//            System.out.println("costhour_输入" + JsonUtil.toString(adStatOfMinuteDWD));
+//            System.out.println("costhour_输出:" + JsonUtil.toString(costHourDM_new));
         }
-        System.out.println("costhour_windowCount:" + adStatOfMinuteDWDlist.size());
+//        System.out.println("costhour_windowCount:" + adStatOfMinuteDWDlist.size());
 
 
     }

+ 278 - 0
flink-ad-monitoring/src/main/java/flink/zanxiangnet/ad/monitoring/process/CostMinuteHourProcess.java

@@ -0,0 +1,278 @@
+package flink.zanxiangnet.ad.monitoring.process;
+
+import flink.zanxiangnet.ad.monitoring.AdStatJob;
+import flink.zanxiangnet.ad.monitoring.clickhouse.sink.ClickhouseUtil;
+import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfHourDWD;
+import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfMinuteDWD;
+import flink.zanxiangnet.ad.monitoring.pojo.entity.CostHourDM;
+import flink.zanxiangnet.ad.monitoring.pojo.entity.CostMinuterDM;
+import flink.zanxiangnet.ad.monitoring.util.DateUtil;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.SimpleDateFormat;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Properties;
+
+public class CostMinuteHourProcess extends ProcessWindowFunction<AdStatOfMinuteDWD, CostHourDM, Long, TimeWindow> {
+    private static final DateTimeFormatter formatForLastReduceKey = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH:mm");
+    private Connection connection = null;
+    private int minutenow = 1;
+
+    @Override
+    public void open(Configuration conf) throws SQLException, ClassNotFoundException, IOException {
+        Properties props = new Properties();
+        props.load(AdStatJob.class.getResourceAsStream("/application.properties"));
+        connection = ClickhouseUtil.getConn(props, "cc-bp11803zbt0oq045io.ads.rds.aliyuncs.com",
+                "8123", "data_monitoring");
+    }
+
+    //数据格式转换
+    public CostHourDM datachange(AdStatOfMinuteDWD adStatOfMinuteDWD, CostHourDM costHourDM) {
+        //时间-天
+        costHourDM.dt = adStatOfMinuteDWD.getStatDay();
+        //计划 id
+        costHourDM.campaignId = adStatOfMinuteDWD.getCampaignId().toString();
+        //时间- real
+        costHourDM.createTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(adStatOfMinuteDWD.getCreateTime());
+        //时间-小时
+        //TODO:之后需要进一步修改
+        String tmpHour = adStatOfMinuteDWD.getHour() > 9 ? adStatOfMinuteDWD.getHour().toString() : "0" + adStatOfMinuteDWD.getHour().toString();
+        costHourDM.hour = adStatOfMinuteDWD.getStatDay() + " " + tmpHour + ":00:00";
+        //广告id
+        costHourDM.adId = adStatOfMinuteDWD.getAdId().toString();
+        //广告组id
+        costHourDM.adgroupId = adStatOfMinuteDWD.getAdgroupId().toString();
+        //创意id
+        costHourDM.adcreativeId = "";
+        //账号id
+        costHourDM.accountId = adStatOfMinuteDWD.getAccountId().toString();
+        //总消耗
+        costHourDM.costTotal = adStatOfMinuteDWD.getCostTotal();
+        //当天消耗
+        costHourDM.costDay = adStatOfMinuteDWD.getCostDay();
+        //当天小时消耗
+        costHourDM.costHour = adStatOfMinuteDWD.getCostHour();
+        //消耗速度
+        costHourDM.costSpeed = adStatOfMinuteDWD.getCostHour();
+        //总浏览量
+        costHourDM.viewCountTotal = adStatOfMinuteDWD.getViewCountTotal();
+        //天-总浏览量
+        costHourDM.viewCountDay = adStatOfMinuteDWD.getViewCountDay();
+        //小时-总浏览量
+        costHourDM.viewCountHour = adStatOfMinuteDWD.getViewCountHour();
+        //总平均千次曝光成本
+        costHourDM.thousandDisplayPriceAll = adStatOfMinuteDWD.getThousandDisplayPriceAll();
+        //天-总平均曝光成本
+        costHourDM.thousandDisplayPriceDay = adStatOfMinuteDWD.getThousandDisplayPriceDay();
+        //小时-总平均曝光成本
+        costHourDM.thousandDisplayPriceHour = adStatOfMinuteDWD.getThousandDisplayPriceHour();
+        //总点击量
+        costHourDM.validClickCountTotal = adStatOfMinuteDWD.getValidClickCountTotal();
+        //天-总点击量
+        costHourDM.validClickCountDay = adStatOfMinuteDWD.getValidClickCountDay();
+        //小时-总点击量
+        costHourDM.validClickCountHour = adStatOfMinuteDWD.getValidClickCountHour();
+        //总平均点击率
+        costHourDM.ctrAll = adStatOfMinuteDWD.getCtrAll();
+        //天-总平均点击率
+        costHourDM.ctrDay = adStatOfMinuteDWD.getCtrDay();
+        //小时-总平均点击率
+        costHourDM.ctrHour = adStatOfMinuteDWD.getCtrHour();
+        //总点击均价
+        costHourDM.cpcAll = adStatOfMinuteDWD.getCpcAll();
+        //天-总点击均价
+        costHourDM.cpcDay = adStatOfMinuteDWD.getCpcDay();
+        //小时-总点击均价
+        costHourDM.cpcHour = adStatOfMinuteDWD.getCpcHour();
+        //总目标转化量
+        costHourDM.conversionsCountTotal = adStatOfMinuteDWD.getConversionsCountTotal();
+        //天-总目标转化量
+        costHourDM.conversionsCountDay = adStatOfMinuteDWD.getConversionsCountDay();
+        //小时-总目标转化量
+        costHourDM.conversionsCountHour = adStatOfMinuteDWD.getConversionsCountHour();
+        //总目标平均转化成本
+        costHourDM.conversionsCostTotal = adStatOfMinuteDWD.getConversionsCostAll();
+        //天-总目标平均转化成本
+        costHourDM.conversionsCostDay = adStatOfMinuteDWD.getConversionsCostDay();
+        //小时-总目标平均转化成本
+        costHourDM.conversionsCostHour = adStatOfMinuteDWD.getConversionsCostHour();
+        //总平均转化率
+        costHourDM.conversionsRateAll = adStatOfMinuteDWD.getConversionsRateAll();
+        //天-总平均转化率
+        costHourDM.conversionsRateDay = adStatOfMinuteDWD.getConversionsRateDay();
+        //小时-总平均转化率
+        costHourDM.conversionsRateHour = adStatOfMinuteDWD.getConversionsRateHour();
+        //TODO:总首日下单roi
+        costHourDM.firstDayOrderRoiTotal = 0;
+        //天-总首日下单roi
+        costHourDM.firstDayOrderRoiDay = 0;
+        //小时-总首日下单roi
+        costHourDM.firstDayOrderRoiHour = 0;
+        //总首日下单金额
+        costHourDM.firstDayOrderAmountTotal = adStatOfMinuteDWD.getFirstDayOrderAmountTotal();
+        //天-总首日下单金额
+        costHourDM.firstDayOrderAmountDay = adStatOfMinuteDWD.getFirstDayOrderAmountDay();
+        //小时-总首日下单金额
+        costHourDM.firstDayOrderAmountHour = adStatOfMinuteDWD.getFirstDayOrderAmountHour();
+        //总首日下单量
+        costHourDM.firstDayOrderCountTotal = adStatOfMinuteDWD.getFirstDayOrderCountTotal();
+        //天-总首日下单量
+        costHourDM.firstDayOrderCountDay = adStatOfMinuteDWD.getFirstDayOrderCountDay();
+        //小时-总首日下单量
+        costHourDM.firstDayOrderCountHour = adStatOfMinuteDWD.getFirstDayOrderCountHour();
+        //总下单金额
+        costHourDM.webOrderAmountTotal = adStatOfMinuteDWD.getOrderAmountTotal();
+        //天-总下单金额
+        costHourDM.webOrderAmountDay = adStatOfMinuteDWD.getOrderAmountDay();
+        //小时-总下单金额
+        costHourDM.webOrderAmountHour = adStatOfMinuteDWD.getOrderAmountHour();
+        //总平均下单成本
+        costHourDM.webOrderCostTotal = adStatOfMinuteDWD.getWebOrderCostAll();
+        //天-总平均下单成本
+        costHourDM.webOrderCostDay = adStatOfMinuteDWD.getWebOrderCostDay();
+        //小时-总平均下单成本
+        costHourDM.webOrderCostHour = adStatOfMinuteDWD.getWebOrderCostHour();
+        //总平均下单率
+        costHourDM.webOrderRateTotal = adStatOfMinuteDWD.getOrderRateAll();
+        //天-总平均下单率
+        costHourDM.webOrderRateDay = adStatOfMinuteDWD.getOrderRateDay();
+        //小时-总平均下单率
+        costHourDM.webOrderRateHour = adStatOfMinuteDWD.getOrderRateHour();
+        //TODO:总平均下单量-----webordercount和ordercount是同一个东西吗
+        costHourDM.webOrderCountTotal = adStatOfMinuteDWD.getOrderCountTotal();
+        //天-总平均下单量
+        costHourDM.webOrderCountDay = adStatOfMinuteDWD.getOrderCountDay();
+        //小时-总平均下单量
+        costHourDM.webOrderCountHour = adStatOfMinuteDWD.getOrderCountHour();
+        //总下单ROI
+        costHourDM.orderRoiTotal = adStatOfMinuteDWD.getOrderRoiAll();
+        //天-总下单roi
+        costHourDM.orderRoiDay = adStatOfMinuteDWD.getOrderRoiDay();
+        //小时-总下单roi
+        costHourDM.orderRoiHour = adStatOfMinuteDWD.getOrderRoiHour();
+        //总平均下单客单价
+        costHourDM.orderUnitPriceTotal = adStatOfMinuteDWD.getOrderUnitPriceAll();
+        //天-总平均下单客单价
+        costHourDM.orderUnitPriceDay = adStatOfMinuteDWD.getOrderUnitPriceDay();
+        //小时-总平均下单客单价
+        costHourDM.orderUnitPriceHour = adStatOfMinuteDWD.getOrderUnitPriceHour();
+        //总公众号关注量
+        costHourDM.fromFollowUvTotal = adStatOfMinuteDWD.getOfficialAccountFollowCountTotal();
+        //天-总公众号关注量
+        costHourDM.fromFollowUvDay = adStatOfMinuteDWD.getOfficialAccountFollowCountDay();
+        //小时-总公众号关注量
+        costHourDM.fromFollowUvHour = adStatOfMinuteDWD.getOfficialAccountFollowCountHour();
+        //TODO:总平均公众号关注成本---是否是价格/关注
+        costHourDM.fromFollowCostTotal = adStatOfMinuteDWD.getOfficialAccountFollowCountTotal() == 0 ? 0 : adStatOfMinuteDWD.getCostTotal() / adStatOfMinuteDWD.getOfficialAccountFollowCountTotal();
+        //天-总平均公众号关注成本
+        costHourDM.fromFollowCostDay = adStatOfMinuteDWD.getOfficialAccountFollowCountDay() == 0 ? 0 : adStatOfMinuteDWD.getCostDay() / adStatOfMinuteDWD.getOfficialAccountFollowCountDay();
+        //小时-总平均公众号关注成本
+        costHourDM.fromFollowCostHour = adStatOfMinuteDWD.getOfficialAccountFollowCountHour() == 0 ? 0 : adStatOfMinuteDWD.getCostHour() / adStatOfMinuteDWD.getOfficialAccountFollowCountHour();
+        //TODO:总平均公众号关注率----确认是否对应
+        costHourDM.fromFollowRateTotal = adStatOfMinuteDWD.getOfficialAccountFollowRateAll();
+        //天-总平均公众号关注率
+        costHourDM.fromFollowRateDay = adStatOfMinuteDWD.getOfficialAccountFollowRateDay();
+        //小时-总平均公众号关注率
+        costHourDM.fromFollowRateHour = adStatOfMinuteDWD.getOfficialAccountFollowRateHour();
+        //TODO:总注册数-----下面全是有问题的
+        costHourDM.webRegisterCountTotal = adStatOfMinuteDWD.getOfficialAccountRegisterUserCountTotal();
+        //天-总注册数
+        costHourDM.webRegisterCountDay = adStatOfMinuteDWD.getOfficialAccountRegisterUserCountDay();
+        //小时-总注册数
+        costHourDM.webRegisterCountHour = adStatOfMinuteDWD.getOfficialAccountRegisterUserCountHour();
+        //总注册人数
+        costHourDM.webRegisterUvTotal = adStatOfMinuteDWD.getOfficialAccountRegisterUserCountTotal();
+        //天-总注册人数
+        costHourDM.webRegisterUvDay = adStatOfMinuteDWD.getOfficialAccountRegisterUserCountDay();
+        //小时-总注册人数
+        costHourDM.webRegisterUvHour = adStatOfMinuteDWD.getOfficialAccountRegisterUserCountHour();
+        //总平均注册成本
+        costHourDM.webRegisterCostTotal = adStatOfMinuteDWD.getOfficialAccountRegisterCostAll();
+        //天-总平均注册成本
+        costHourDM.webRegisterCostDay = adStatOfMinuteDWD.getOfficialAccountRegisterCostDay();
+        //小时-总平均注册成本
+        costHourDM.webRegisterCostHour = adStatOfMinuteDWD.getOfficialAccountRegisterCostHour();
+        return costHourDM;
+    }
+
+
+    @Override
+    public void process(Long elementCount, ProcessWindowFunction<AdStatOfMinuteDWD, CostHourDM, Long, TimeWindow>.Context context,
+                        Iterable<AdStatOfMinuteDWD> iterable, Collector<CostHourDM> collector) throws Exception {
+        long beginTime = context.window().getStart();
+        LocalDateTime beginDateTime = DateUtil.milliToLocalDateTime(beginTime);
+        LocalDate beginDate = beginDateTime.toLocalDate();
+        //当前天
+        String statDay = DateUtil.formatLocalDate(beginDate);
+//        System.out.println(statDay);
+        //当前小时
+        int hourInt = beginDateTime.getHour();
+        String hour = beginDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:00:00"));
+        String lastHour = beginDateTime.minusHours(1).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:00:00"));
+        String lastTwoHour = beginDateTime.minusHours(2).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:00:00"));
+        String lastThreeHour = beginDateTime.minusHours(3).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:00:00"));
+
+        this.minutenow = (beginDateTime.getMinute() % 5) == 0 ? 1 : (beginDateTime.getMinute() % 5);
+        long now = System.currentTimeMillis();
+
+        //获取前几分钟
+        List<AdStatOfMinuteDWD> adStatOfMinuteDWDlist = new ArrayList<>(24);
+
+        for (AdStatOfMinuteDWD adStatOfMinuteDWD : iterable) {
+
+            adStatOfMinuteDWDlist.add(adStatOfMinuteDWD);
+            CostHourDM costHourDM = new CostHourDM();
+
+            if (adStatOfMinuteDWD.getHour() != hourInt) {
+                continue;
+            }
+
+            String adId = adStatOfMinuteDWD.getAdId().toString();
+            String sql = "select " +
+                    "if(hour='" + lastHour + "',cost_hour,0) last_hour_cost, " +
+                    "if(hour='" + lastTwoHour + "',cost_hour,0) last_two_hour_cost, " +
+                    "if(hour='" + lastHour + "',cost_hour,0) - if(hour='" + lastTwoHour + "',cost_hour,0) cost_last_hour_diff, " +
+                    "(if(hour='" + lastHour + "',cost_hour,0) - if(hour='" + lastTwoHour + "',cost_hour,0))*(if(hour='" + lastTwoHour + "',cost_hour,0) - if(hour='" + lastThreeHour + "',cost_hour,0)) cost_last_three_trend " +
+                    "from data_monitoring.cost_hour ch " +
+                    "where dt='" + statDay + "' and ad_id='" + adId + "' ";
+
+//            System.out.println(sql);
+            Statement statement = connection.createStatement();
+            ResultSet rs = statement.executeQuery(sql);
+            while (rs.next()) {
+                costHourDM.costLastHour = rs.getLong(1);
+                costHourDM.costLastTwoHour = rs.getLong(2);
+                costHourDM.costLastHourDiff = rs.getLong(3);
+                costHourDM.costLastThreeTrend = rs.getLong(4);
+
+            }
+
+
+            CostHourDM CostHourDM_new = datachange(adStatOfMinuteDWD, costHourDM);
+
+            collector.collect(CostHourDM_new);
+            // System.out.println("costminute_输出:" + JsonUtil.toString(CostMinuterDM_new));
+        }
+        // System.out.println("costminute_windowCount:" + adStatOfMinuteDWDlist.size());
+
+
+    }
+
+    @Override
+    public void clear(ProcessWindowFunction<AdStatOfMinuteDWD, CostHourDM, Long, TimeWindow>.Context context) throws Exception {
+    }
+
+}