|
@@ -1,273 +0,0 @@
|
|
|
-package flink.zanxiangnet.ad.monitoring.process;
|
|
|
-
|
|
|
-import flink.zanxiangnet.ad.monitoring.clickhouse.sink.ClickhouseUtil;
|
|
|
-import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfMinuteDWD;
|
|
|
-import flink.zanxiangnet.ad.monitoring.pojo.entity.CostHourDM;
|
|
|
-import flink.zanxiangnet.ad.monitoring.util.DateUtil;
|
|
|
-import org.apache.flink.configuration.Configuration;
|
|
|
-import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
|
|
|
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
|
|
|
-import org.apache.flink.util.Collector;
|
|
|
-
|
|
|
-import java.io.IOException;
|
|
|
-import java.sql.Connection;
|
|
|
-import java.sql.ResultSet;
|
|
|
-import java.sql.SQLException;
|
|
|
-import java.sql.Statement;
|
|
|
-import java.text.SimpleDateFormat;
|
|
|
-import java.time.LocalDate;
|
|
|
-import java.time.LocalDateTime;
|
|
|
-import java.time.format.DateTimeFormatter;
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.List;
|
|
|
-import java.util.Properties;
|
|
|
-
|
|
|
-public class CostMinuteHourProcess extends ProcessWindowFunction<AdStatOfMinuteDWD, CostHourDM, Long, TimeWindow> {
|
|
|
- private static final DateTimeFormatter formatForLastReduceKey = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH:mm");
|
|
|
- private Connection connection = null;
|
|
|
- private int minutenow = 1;
|
|
|
-
|
|
|
- @Override
|
|
|
- public void open(Configuration conf) throws SQLException, ClassNotFoundException, IOException {
|
|
|
- Properties props = new Properties();
|
|
|
- props.load(CostMinuteHourProcess.class.getResourceAsStream("/application.properties"));
|
|
|
- connection = ClickhouseUtil.getConn(props);
|
|
|
- }
|
|
|
-
|
|
|
- //数据格式转换
|
|
|
- public CostHourDM datachange(AdStatOfMinuteDWD adStatOfMinuteDWD, CostHourDM costHourDM) {
|
|
|
- //时间-天
|
|
|
- costHourDM.dt = adStatOfMinuteDWD.getStatDay();
|
|
|
- //计划 id
|
|
|
- costHourDM.campaignId = adStatOfMinuteDWD.getCampaignId().toString();
|
|
|
- //时间- real
|
|
|
- costHourDM.createTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(adStatOfMinuteDWD.getCreateTime());
|
|
|
- //时间-小时
|
|
|
- //TODO:之后需要进一步修改
|
|
|
- String tmpHour = adStatOfMinuteDWD.getHour() > 9 ? adStatOfMinuteDWD.getHour().toString() : "0" + adStatOfMinuteDWD.getHour().toString();
|
|
|
- costHourDM.hour = adStatOfMinuteDWD.getStatDay() + " " + tmpHour + ":00:00";
|
|
|
- //广告id
|
|
|
- costHourDM.adId = adStatOfMinuteDWD.getAdId().toString();
|
|
|
- //广告组id
|
|
|
- costHourDM.adgroupId = adStatOfMinuteDWD.getAdgroupId().toString();
|
|
|
- //创意id
|
|
|
- costHourDM.adcreativeId = "";
|
|
|
- //账号id
|
|
|
- costHourDM.accountId = adStatOfMinuteDWD.getAccountId().toString();
|
|
|
- //总消耗
|
|
|
- costHourDM.costTotal = adStatOfMinuteDWD.getCostTotal();
|
|
|
- //当天消耗
|
|
|
- costHourDM.costDay = adStatOfMinuteDWD.getCostDay();
|
|
|
- //当天小时消耗
|
|
|
- costHourDM.costHour = adStatOfMinuteDWD.getCostHour();
|
|
|
- //消耗速度
|
|
|
- costHourDM.costSpeed = adStatOfMinuteDWD.getCostHour();
|
|
|
- //总浏览量
|
|
|
- costHourDM.viewCountTotal = adStatOfMinuteDWD.getViewCountTotal();
|
|
|
- //天-总浏览量
|
|
|
- costHourDM.viewCountDay = adStatOfMinuteDWD.getViewCountDay();
|
|
|
- //小时-总浏览量
|
|
|
- costHourDM.viewCountHour = adStatOfMinuteDWD.getViewCountHour();
|
|
|
- //总平均千次曝光成本
|
|
|
- costHourDM.thousandDisplayPriceAll = adStatOfMinuteDWD.getThousandDisplayPriceAll();
|
|
|
- //天-总平均曝光成本
|
|
|
- costHourDM.thousandDisplayPriceDay = adStatOfMinuteDWD.getThousandDisplayPriceDay();
|
|
|
- //小时-总平均曝光成本
|
|
|
- costHourDM.thousandDisplayPriceHour = adStatOfMinuteDWD.getThousandDisplayPriceHour();
|
|
|
- //总点击量
|
|
|
- costHourDM.validClickCountTotal = adStatOfMinuteDWD.getValidClickCountTotal();
|
|
|
- //天-总点击量
|
|
|
- costHourDM.validClickCountDay = adStatOfMinuteDWD.getValidClickCountDay();
|
|
|
- //小时-总点击量
|
|
|
- costHourDM.validClickCountHour = adStatOfMinuteDWD.getValidClickCountHour();
|
|
|
- //总平均点击率
|
|
|
- costHourDM.ctrAll = adStatOfMinuteDWD.getCtrAll();
|
|
|
- //天-总平均点击率
|
|
|
- costHourDM.ctrDay = adStatOfMinuteDWD.getCtrDay();
|
|
|
- //小时-总平均点击率
|
|
|
- costHourDM.ctrHour = adStatOfMinuteDWD.getCtrHour();
|
|
|
- //总点击均价
|
|
|
- costHourDM.cpcAll = adStatOfMinuteDWD.getCpcAll();
|
|
|
- //天-总点击均价
|
|
|
- costHourDM.cpcDay = adStatOfMinuteDWD.getCpcDay();
|
|
|
- //小时-总点击均价
|
|
|
- costHourDM.cpcHour = adStatOfMinuteDWD.getCpcHour();
|
|
|
- //总目标转化量
|
|
|
- costHourDM.conversionsCountTotal = adStatOfMinuteDWD.getConversionsCountTotal();
|
|
|
- //天-总目标转化量
|
|
|
- costHourDM.conversionsCountDay = adStatOfMinuteDWD.getConversionsCountDay();
|
|
|
- //小时-总目标转化量
|
|
|
- costHourDM.conversionsCountHour = adStatOfMinuteDWD.getConversionsCountHour();
|
|
|
- //总目标平均转化成本
|
|
|
- costHourDM.conversionsCostTotal = adStatOfMinuteDWD.getConversionsCostAll();
|
|
|
- //天-总目标平均转化成本
|
|
|
- costHourDM.conversionsCostDay = adStatOfMinuteDWD.getConversionsCostDay();
|
|
|
- //小时-总目标平均转化成本
|
|
|
- costHourDM.conversionsCostHour = adStatOfMinuteDWD.getConversionsCostHour();
|
|
|
- //总平均转化率
|
|
|
- costHourDM.conversionsRateAll = adStatOfMinuteDWD.getConversionsRateAll();
|
|
|
- //天-总平均转化率
|
|
|
- costHourDM.conversionsRateDay = adStatOfMinuteDWD.getConversionsRateDay();
|
|
|
- //小时-总平均转化率
|
|
|
- costHourDM.conversionsRateHour = adStatOfMinuteDWD.getConversionsRateHour();
|
|
|
- //TODO:总首日下单roi
|
|
|
- costHourDM.firstDayOrderRoiTotal = 0;
|
|
|
- //天-总首日下单roi
|
|
|
- costHourDM.firstDayOrderRoiDay = 0;
|
|
|
- //小时-总首日下单roi
|
|
|
- costHourDM.firstDayOrderRoiHour = 0;
|
|
|
- //总首日下单金额
|
|
|
- costHourDM.firstDayOrderAmountTotal = adStatOfMinuteDWD.getFirstDayOrderAmountTotal();
|
|
|
- //天-总首日下单金额
|
|
|
- costHourDM.firstDayOrderAmountDay = adStatOfMinuteDWD.getFirstDayOrderAmountDay();
|
|
|
- //小时-总首日下单金额
|
|
|
- costHourDM.firstDayOrderAmountHour = adStatOfMinuteDWD.getFirstDayOrderAmountHour();
|
|
|
- //总首日下单量
|
|
|
- costHourDM.firstDayOrderCountTotal = adStatOfMinuteDWD.getFirstDayOrderCountTotal();
|
|
|
- //天-总首日下单量
|
|
|
- costHourDM.firstDayOrderCountDay = adStatOfMinuteDWD.getFirstDayOrderCountDay();
|
|
|
- //小时-总首日下单量
|
|
|
- costHourDM.firstDayOrderCountHour = adStatOfMinuteDWD.getFirstDayOrderCountHour();
|
|
|
- //总下单金额
|
|
|
- costHourDM.webOrderAmountTotal = adStatOfMinuteDWD.getOrderAmountTotal();
|
|
|
- //天-总下单金额
|
|
|
- costHourDM.webOrderAmountDay = adStatOfMinuteDWD.getOrderAmountDay();
|
|
|
- //小时-总下单金额
|
|
|
- costHourDM.webOrderAmountHour = adStatOfMinuteDWD.getOrderAmountHour();
|
|
|
- //总平均下单成本
|
|
|
- costHourDM.webOrderCostTotal = adStatOfMinuteDWD.getWebOrderCostAll();
|
|
|
- //天-总平均下单成本
|
|
|
- costHourDM.webOrderCostDay = adStatOfMinuteDWD.getWebOrderCostDay();
|
|
|
- //小时-总平均下单成本
|
|
|
- costHourDM.webOrderCostHour = adStatOfMinuteDWD.getWebOrderCostHour();
|
|
|
- //总平均下单率
|
|
|
- costHourDM.webOrderRateTotal = adStatOfMinuteDWD.getOrderRateAll();
|
|
|
- //天-总平均下单率
|
|
|
- costHourDM.webOrderRateDay = adStatOfMinuteDWD.getOrderRateDay();
|
|
|
- //小时-总平均下单率
|
|
|
- costHourDM.webOrderRateHour = adStatOfMinuteDWD.getOrderRateHour();
|
|
|
- //TODO:总平均下单量-----webordercount和ordercount是同一个东西吗
|
|
|
- costHourDM.webOrderCountTotal = adStatOfMinuteDWD.getOrderCountTotal();
|
|
|
- //天-总平均下单量
|
|
|
- costHourDM.webOrderCountDay = adStatOfMinuteDWD.getOrderCountDay();
|
|
|
- //小时-总平均下单量
|
|
|
- costHourDM.webOrderCountHour = adStatOfMinuteDWD.getOrderCountHour();
|
|
|
- //总下单ROI
|
|
|
- costHourDM.orderRoiTotal = adStatOfMinuteDWD.getOrderRoiAll();
|
|
|
- //天-总下单roi
|
|
|
- costHourDM.orderRoiDay = adStatOfMinuteDWD.getOrderRoiDay();
|
|
|
- //小时-总下单roi
|
|
|
- costHourDM.orderRoiHour = adStatOfMinuteDWD.getOrderRoiHour();
|
|
|
- //总平均下单客单价
|
|
|
- costHourDM.orderUnitPriceTotal = adStatOfMinuteDWD.getOrderUnitPriceAll();
|
|
|
- //天-总平均下单客单价
|
|
|
- costHourDM.orderUnitPriceDay = adStatOfMinuteDWD.getOrderUnitPriceDay();
|
|
|
- //小时-总平均下单客单价
|
|
|
- costHourDM.orderUnitPriceHour = adStatOfMinuteDWD.getOrderUnitPriceHour();
|
|
|
- //总公众号关注量
|
|
|
- costHourDM.fromFollowUvTotal = adStatOfMinuteDWD.getOfficialAccountFollowCountTotal();
|
|
|
- //天-总公众号关注量
|
|
|
- costHourDM.fromFollowUvDay = adStatOfMinuteDWD.getOfficialAccountFollowCountDay();
|
|
|
- //小时-总公众号关注量
|
|
|
- costHourDM.fromFollowUvHour = adStatOfMinuteDWD.getOfficialAccountFollowCountHour();
|
|
|
- //TODO:总平均公众号关注成本---是否是价格/关注
|
|
|
- costHourDM.fromFollowCostTotal = adStatOfMinuteDWD.getOfficialAccountFollowCountTotal() == 0 ? 0 : adStatOfMinuteDWD.getCostTotal() / adStatOfMinuteDWD.getOfficialAccountFollowCountTotal();
|
|
|
- //天-总平均公众号关注成本
|
|
|
- costHourDM.fromFollowCostDay = adStatOfMinuteDWD.getOfficialAccountFollowCountDay() == 0 ? 0 : adStatOfMinuteDWD.getCostDay() / adStatOfMinuteDWD.getOfficialAccountFollowCountDay();
|
|
|
- //小时-总平均公众号关注成本
|
|
|
- costHourDM.fromFollowCostHour = adStatOfMinuteDWD.getOfficialAccountFollowCountHour() == 0 ? 0 : adStatOfMinuteDWD.getCostHour() / adStatOfMinuteDWD.getOfficialAccountFollowCountHour();
|
|
|
- //TODO:总平均公众号关注率----确认是否对应
|
|
|
- costHourDM.fromFollowRateTotal = adStatOfMinuteDWD.getOfficialAccountFollowRateAll();
|
|
|
- //天-总平均公众号关注率
|
|
|
- costHourDM.fromFollowRateDay = adStatOfMinuteDWD.getOfficialAccountFollowRateDay();
|
|
|
- //小时-总平均公众号关注率
|
|
|
- costHourDM.fromFollowRateHour = adStatOfMinuteDWD.getOfficialAccountFollowRateHour();
|
|
|
- //TODO:总注册数-----下面全是有问题的
|
|
|
- costHourDM.webRegisterCountTotal = adStatOfMinuteDWD.getOfficialAccountRegisterUserCountTotal();
|
|
|
- //天-总注册数
|
|
|
- costHourDM.webRegisterCountDay = adStatOfMinuteDWD.getOfficialAccountRegisterUserCountDay();
|
|
|
- //小时-总注册数
|
|
|
- costHourDM.webRegisterCountHour = adStatOfMinuteDWD.getOfficialAccountRegisterUserCountHour();
|
|
|
- //总注册人数
|
|
|
- costHourDM.webRegisterUvTotal = adStatOfMinuteDWD.getOfficialAccountRegisterUserCountTotal();
|
|
|
- //天-总注册人数
|
|
|
- costHourDM.webRegisterUvDay = adStatOfMinuteDWD.getOfficialAccountRegisterUserCountDay();
|
|
|
- //小时-总注册人数
|
|
|
- costHourDM.webRegisterUvHour = adStatOfMinuteDWD.getOfficialAccountRegisterUserCountHour();
|
|
|
- //总平均注册成本
|
|
|
- costHourDM.webRegisterCostTotal = adStatOfMinuteDWD.getOfficialAccountRegisterCostAll();
|
|
|
- //天-总平均注册成本
|
|
|
- costHourDM.webRegisterCostDay = adStatOfMinuteDWD.getOfficialAccountRegisterCostDay();
|
|
|
- //小时-总平均注册成本
|
|
|
- costHourDM.webRegisterCostHour = adStatOfMinuteDWD.getOfficialAccountRegisterCostHour();
|
|
|
- return costHourDM;
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- @Override
|
|
|
- public void process(Long elementCount, ProcessWindowFunction<AdStatOfMinuteDWD, CostHourDM, Long, TimeWindow>.Context context,
|
|
|
- Iterable<AdStatOfMinuteDWD> iterable, Collector<CostHourDM> collector) throws Exception {
|
|
|
- long beginTime = context.window().getStart();
|
|
|
- LocalDateTime beginDateTime = DateUtil.milliToLocalDateTime(beginTime);
|
|
|
- LocalDate beginDate = beginDateTime.toLocalDate();
|
|
|
- //当前天
|
|
|
- String statDay = DateUtil.formatLocalDate(beginDate);
|
|
|
-// System.out.println(statDay);
|
|
|
- //当前小时
|
|
|
- int hourInt = beginDateTime.getHour();
|
|
|
- String hour = beginDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:00:00"));
|
|
|
- String lastHour = beginDateTime.minusHours(1).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:00:00"));
|
|
|
- String lastTwoHour = beginDateTime.minusHours(2).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:00:00"));
|
|
|
- String lastThreeHour = beginDateTime.minusHours(3).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:00:00"));
|
|
|
-
|
|
|
- this.minutenow = (beginDateTime.getMinute() % 5) == 0 ? 1 : (beginDateTime.getMinute() % 5);
|
|
|
- long now = System.currentTimeMillis();
|
|
|
-
|
|
|
- //获取前几分钟
|
|
|
- List<AdStatOfMinuteDWD> adStatOfMinuteDWDlist = new ArrayList<>(24);
|
|
|
-
|
|
|
- for (AdStatOfMinuteDWD adStatOfMinuteDWD : iterable) {
|
|
|
-
|
|
|
- adStatOfMinuteDWDlist.add(adStatOfMinuteDWD);
|
|
|
- CostHourDM costHourDM = new CostHourDM();
|
|
|
-
|
|
|
- if (adStatOfMinuteDWD.getHour() != hourInt) {
|
|
|
- continue;
|
|
|
- }
|
|
|
-
|
|
|
- String adId = adStatOfMinuteDWD.getAdId().toString();
|
|
|
- String sql = "select " +
|
|
|
- "if(hour='" + lastHour + "',cost_hour,0) last_hour_cost, " +
|
|
|
- "if(hour='" + lastTwoHour + "',cost_hour,0) last_two_hour_cost, " +
|
|
|
- "if(hour='" + lastHour + "',cost_hour,0) - if(hour='" + lastTwoHour + "',cost_hour,0) cost_last_hour_diff, " +
|
|
|
- "(if(hour='" + lastHour + "',cost_hour,0) - if(hour='" + lastTwoHour + "',cost_hour,0))*(if(hour='" + lastTwoHour + "',cost_hour,0) - if(hour='" + lastThreeHour + "',cost_hour,0)) cost_last_three_trend " +
|
|
|
- "from data_monitoring.cost_hour ch " +
|
|
|
- "where dt='" + statDay + "' and ad_id='" + adId + "' ";
|
|
|
-
|
|
|
-// System.out.println(sql);
|
|
|
- Statement statement = connection.createStatement();
|
|
|
- ResultSet rs = statement.executeQuery(sql);
|
|
|
- while (rs.next()) {
|
|
|
- costHourDM.costLastHour = rs.getLong(1);
|
|
|
- costHourDM.costLastTwoHour = rs.getLong(2);
|
|
|
- costHourDM.costLastHourDiff = rs.getLong(3);
|
|
|
- costHourDM.costLastThreeTrend = rs.getLong(4);
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- CostHourDM CostHourDM_new = datachange(adStatOfMinuteDWD, costHourDM);
|
|
|
-
|
|
|
- collector.collect(CostHourDM_new);
|
|
|
- // System.out.println("costminute_输出:" + JsonUtil.toString(CostMinuterDM_new));
|
|
|
- }
|
|
|
- // System.out.println("costminute_windowCount:" + adStatOfMinuteDWDlist.size());
|
|
|
-
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void clear(ProcessWindowFunction<AdStatOfMinuteDWD, CostHourDM, Long, TimeWindow>.Context context) throws Exception {
|
|
|
- }
|
|
|
-
|
|
|
-}
|