|
@@ -1,301 +0,0 @@
|
|
|
-package flink.zanxiangnet.ad.monitoring.process;
|
|
|
-
|
|
|
-import flink.zanxiangnet.ad.monitoring.clickhouse.sink.ClickhouseUtil;
|
|
|
-import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfDayDWD;
|
|
|
-import flink.zanxiangnet.ad.monitoring.pojo.entity.CostHourDM;
|
|
|
-import org.apache.flink.configuration.Configuration;
|
|
|
-import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
|
|
|
-import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
|
|
|
-import org.apache.flink.util.Collector;
|
|
|
-
|
|
|
-import java.io.IOException;
|
|
|
-import java.sql.Connection;
|
|
|
-import java.sql.ResultSet;
|
|
|
-import java.sql.SQLException;
|
|
|
-import java.sql.Statement;
|
|
|
-import java.text.SimpleDateFormat;
|
|
|
-import java.time.format.DateTimeFormatter;
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.List;
|
|
|
-import java.util.Properties;
|
|
|
-
|
|
|
-public class CostHourDayProcess extends ProcessWindowFunction<AdStatOfDayDWD, CostHourDM, Long, GlobalWindow> {
|
|
|
- private static final DateTimeFormatter formatForLastReduceKey = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH:mm");
|
|
|
- private Connection connection = null;
|
|
|
-
|
|
|
-
|
|
|
- @Override
|
|
|
- public void open(Configuration conf) throws SQLException, ClassNotFoundException, IOException {
|
|
|
- Properties props = new Properties();
|
|
|
- props.load(CostHourDayProcess.class.getResourceAsStream("/application.properties"));
|
|
|
- connection = ClickhouseUtil.getConn(props);
|
|
|
- }
|
|
|
-
|
|
|
- //数据格式转换
|
|
|
- public CostHourDM datachange(AdStatOfDayDWD adStatOfMinuteDWD, CostHourDM costHourDM) {
|
|
|
- //时间-天
|
|
|
- costHourDM.setDt(adStatOfMinuteDWD.getStatDay());
|
|
|
- //计划 id
|
|
|
- costHourDM.setCampaignId(adStatOfMinuteDWD.getCampaignId().toString());
|
|
|
- //时间- real
|
|
|
- costHourDM.setCreateTime(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(adStatOfMinuteDWD.getCreateTime()));
|
|
|
- //时间-小时
|
|
|
- //TODO:之后需要进一步修改
|
|
|
- costHourDM.setHour(adStatOfMinuteDWD.getStatDay() + " 23:00:00");
|
|
|
- //广告id
|
|
|
- costHourDM.setAdId(adStatOfMinuteDWD.getAdId().toString());
|
|
|
- //广告组id
|
|
|
- costHourDM.setAdgroupId(adStatOfMinuteDWD.getAdgroupId().toString());;
|
|
|
- //创意id
|
|
|
- costHourDM.setAdcreativeId("");;
|
|
|
- //账号id
|
|
|
- costHourDM.setAccountId(adStatOfMinuteDWD.getAccountId().toString());
|
|
|
- //总消耗
|
|
|
- costHourDM.setCostTotal(adStatOfMinuteDWD.getCostTotal());
|
|
|
- //当天消耗
|
|
|
- costHourDM.setCostDay(adStatOfMinuteDWD.getCostDay());
|
|
|
- //当天小时消耗
|
|
|
- //TODO:数据--小时会缺失,下面的小时数据同理
|
|
|
- costHourDM.setCostHour(0);
|
|
|
- //消耗速度
|
|
|
- //TODO:如果这个回滚发生在24点有问题,会覆盖掉24点的小时速度
|
|
|
- costHourDM.setCostSpeed(0);
|
|
|
- //总浏览量
|
|
|
- costHourDM.setViewCountTotal(adStatOfMinuteDWD.getViewCountTotal());
|
|
|
- //天-总浏览量
|
|
|
- costHourDM.setViewCountDay(adStatOfMinuteDWD.getViewCountDay());
|
|
|
- //小时-总浏览量
|
|
|
- costHourDM.setViewCountHour(0);
|
|
|
- //总平均千次曝光成本
|
|
|
- costHourDM.setThousandDisplayPriceAll(adStatOfMinuteDWD.getThousandDisplayPriceAll());
|
|
|
- //天-总平均曝光成本
|
|
|
- costHourDM.setThousandDisplayPriceDay(adStatOfMinuteDWD.getThousandDisplayPriceDay());
|
|
|
- //小时-总平均曝光成本
|
|
|
- costHourDM.setThousandDisplayPriceHour(0);
|
|
|
- //总点击量
|
|
|
- costHourDM.setValidClickCountTotal(adStatOfMinuteDWD.getValidClickCountTotal());
|
|
|
- //天-总点击量
|
|
|
- costHourDM.setValidClickCountDay(adStatOfMinuteDWD.getValidClickCountDay());
|
|
|
- //小时-总点击量
|
|
|
- costHourDM.setValidClickCountHour(0);
|
|
|
- //总平均点击率
|
|
|
- costHourDM.setCtrAll(adStatOfMinuteDWD.getCtrAll().doubleValue());
|
|
|
- //天-总平均点击率
|
|
|
- costHourDM.setCtrDay(adStatOfMinuteDWD.getCtrDay().doubleValue());
|
|
|
- //小时-总平均点击率
|
|
|
- costHourDM.setCtrHour(0);
|
|
|
- //总点击均价
|
|
|
- costHourDM.setCpcAll(adStatOfMinuteDWD.getCpcAll());
|
|
|
- //天-总点击均价
|
|
|
- costHourDM.setCpcDay(adStatOfMinuteDWD.getCpcDay());
|
|
|
- //小时-总点击均价
|
|
|
- costHourDM.setCpcHour(0);
|
|
|
- //总目标转化量
|
|
|
- costHourDM.setConversionsCountTotal(adStatOfMinuteDWD.getConversionsCountTotal());
|
|
|
- //天-总目标转化量
|
|
|
- costHourDM.setConversionsCountDay(adStatOfMinuteDWD.getConversionsCountDay());
|
|
|
- //小时-总目标转化量
|
|
|
- costHourDM.setConversionsCountHour(0);
|
|
|
- //总目标平均转化成本
|
|
|
- costHourDM.setConversionsCostTotal(adStatOfMinuteDWD.getConversionsCostAll());
|
|
|
- //天-总目标平均转化成本
|
|
|
- costHourDM.setConversionsCostDay(adStatOfMinuteDWD.getConversionsCostDay());
|
|
|
- //小时-总目标平均转化成本
|
|
|
- costHourDM.setConversionsCostHour(0);
|
|
|
- //总平均转化率
|
|
|
- costHourDM.setConversionsRateAll(adStatOfMinuteDWD.getConversionsRateAll().doubleValue());
|
|
|
- //天-总平均转化率
|
|
|
- costHourDM.setConversionsRateDay(adStatOfMinuteDWD.getConversionsRateDay().doubleValue());
|
|
|
- //小时-总平均转化率
|
|
|
- costHourDM.setConversionsRateHour(0);
|
|
|
- //TODO:总首日下单roi
|
|
|
- costHourDM.setFirstDayOrderRoiTotal(0);
|
|
|
- //天-总首日下单roi
|
|
|
- costHourDM.setFirstDayOrderRoiDay(0);
|
|
|
- //小时-总首日下单roi
|
|
|
- costHourDM.setFirstDayOrderRoiHour(0);
|
|
|
- //总首日下单金额
|
|
|
- costHourDM.setFirstDayOrderAmountTotal(adStatOfMinuteDWD.getFirstDayOrderAmountTotal());
|
|
|
- //天-总首日下单金额
|
|
|
- costHourDM.setFirstDayOrderAmountDay(adStatOfMinuteDWD.getFirstDayOrderAmountDay());
|
|
|
- //小时-总首日下单金额
|
|
|
- costHourDM.setFirstDayOrderAmountHour(0);
|
|
|
- //总首日下单量
|
|
|
- costHourDM.setFirstDayOrderCountTotal(adStatOfMinuteDWD.getFirstDayOrderCountTotal());
|
|
|
- //天-总首日下单量
|
|
|
- costHourDM.setFirstDayOrderCountDay(adStatOfMinuteDWD.getFirstDayOrderCountDay());
|
|
|
- //小时-总首日下单量
|
|
|
- costHourDM.setFirstDayOrderCountHour(0);
|
|
|
- //总下单金额
|
|
|
- costHourDM.setWebOrderAmountTotal(adStatOfMinuteDWD.getOrderAmountTotal());
|
|
|
- //天-总下单金额
|
|
|
- costHourDM.setWebOrderAmountDay(adStatOfMinuteDWD.getOrderAmountDay());
|
|
|
- //小时-总下单金额
|
|
|
- costHourDM.setWebOrderAmountHour(0);
|
|
|
- //总平均下单成本
|
|
|
- costHourDM.setWebOrderCostTotal(adStatOfMinuteDWD.getWebOrderCostAll());
|
|
|
- //天-总平均下单成本
|
|
|
- costHourDM.setWebOrderCostDay(adStatOfMinuteDWD.getWebOrderCostDay());
|
|
|
- //小时-总平均下单成本
|
|
|
- costHourDM.setWebOrderCostHour(0);
|
|
|
- //总平均下单率
|
|
|
- costHourDM.setWebOrderRateTotal(adStatOfMinuteDWD.getOrderRateAll().doubleValue());
|
|
|
- //天-总平均下单率
|
|
|
- costHourDM.setWebOrderRateDay(adStatOfMinuteDWD.getOrderRateDay().doubleValue());
|
|
|
- //小时-总平均下单率
|
|
|
- costHourDM.setWebOrderRateHour(0);
|
|
|
- //TODO:总平均下单量-----webordercount和ordercount是同一个东西吗
|
|
|
- costHourDM.setWebOrderCountTotal(adStatOfMinuteDWD.getOrderCountTotal());
|
|
|
- //天-总平均下单量
|
|
|
- costHourDM.setWebOrderCountDay(adStatOfMinuteDWD.getOrderCountDay());
|
|
|
- //小时-总平均下单量
|
|
|
- costHourDM.setWebOrderCountHour(0);
|
|
|
- //总下单ROI
|
|
|
- costHourDM.setOrderRoiTotal(adStatOfMinuteDWD.getOrderRoiAll().doubleValue());
|
|
|
- //天-总下单roi
|
|
|
- costHourDM.setOrderRoiDay(adStatOfMinuteDWD.getOrderRoiDay().doubleValue());
|
|
|
- //小时-总下单roi
|
|
|
- costHourDM.setOrderRoiHour(0);
|
|
|
- //总平均下单客单价
|
|
|
- costHourDM.setOrderUnitPriceTotal(adStatOfMinuteDWD.getOrderUnitPriceAll());
|
|
|
- //天-总平均下单客单价
|
|
|
- costHourDM.setOrderUnitPriceDay(adStatOfMinuteDWD.getOrderUnitPriceDay());
|
|
|
- //小时-总平均下单客单价
|
|
|
- costHourDM.setOrderUnitPriceHour(0);
|
|
|
- //总公众号关注量
|
|
|
- costHourDM.setFromFollowUvTotal(adStatOfMinuteDWD.getOfficialAccountFollowCountTotal());
|
|
|
- //天-总公众号关注量
|
|
|
- costHourDM.setFromFollowUvDay(adStatOfMinuteDWD.getOfficialAccountFollowCountDay());
|
|
|
- //小时-总公众号关注量
|
|
|
- costHourDM.setFromFollowUvHour(0);
|
|
|
- //TODO:总平均公众号关注成本---是否是价格/关注
|
|
|
- costHourDM.setFromFollowCostTotal(adStatOfMinuteDWD.getOfficialAccountFollowCountTotal() == 0 ? 0 : adStatOfMinuteDWD.getCostTotal() / adStatOfMinuteDWD.getOfficialAccountFollowCountTotal());
|
|
|
- //天-总平均公众号关注成本
|
|
|
- costHourDM.setFromFollowCostDay(adStatOfMinuteDWD.getOfficialAccountFollowCountDay() == 0 ? 0 : adStatOfMinuteDWD.getCostDay() / adStatOfMinuteDWD.getOfficialAccountFollowCountDay());
|
|
|
- //小时-总平均公众号关注成本
|
|
|
- costHourDM.setFromFollowCostHour(0);
|
|
|
- //TODO:总平均公众号关注率----确认是否对应
|
|
|
- costHourDM.setFromFollowRateTotal(adStatOfMinuteDWD.getOfficialAccountFollowRateAll().doubleValue());
|
|
|
- //天-总平均公众号关注率
|
|
|
- costHourDM.setFromFollowRateDay(adStatOfMinuteDWD.getOfficialAccountFollowRateDay().doubleValue());
|
|
|
- //小时-总平均公众号关注率
|
|
|
- costHourDM.setFromFollowRateHour(0);
|
|
|
- //TODO:总注册数-----下面全是有问题的
|
|
|
- costHourDM.setWebRegisterCountTotal(adStatOfMinuteDWD.getOfficialAccountRegisterUserCountTotal());
|
|
|
- //天-总注册数
|
|
|
- costHourDM.setWebRegisterCountDay(adStatOfMinuteDWD.getOfficialAccountRegisterUserCountDay());
|
|
|
- //小时-总注册数
|
|
|
- costHourDM.setWebRegisterCountHour(0);
|
|
|
- //总注册人数
|
|
|
- costHourDM.setWebRegisterUvTotal(adStatOfMinuteDWD.getOfficialAccountRegisterUserCountTotal());
|
|
|
- //天-总注册人数
|
|
|
- costHourDM.setWebRegisterUvDay(adStatOfMinuteDWD.getOfficialAccountRegisterUserCountDay());
|
|
|
- //小时-总注册人数
|
|
|
- costHourDM.setWebRegisterUvHour(0);
|
|
|
- //总平均注册成本
|
|
|
- costHourDM.setWebRegisterCostTotal(adStatOfMinuteDWD.getOfficialAccountRegisterCostAll());
|
|
|
- //天-总平均注册成本
|
|
|
- costHourDM.setWebRegisterCostDay(adStatOfMinuteDWD.getOfficialAccountRegisterCostDay());
|
|
|
- //小时-总平均注册成本
|
|
|
- costHourDM.setWebRegisterCostHour(0);
|
|
|
-
|
|
|
- return costHourDM;
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- @Override
|
|
|
- public void process(Long elementCount, ProcessWindowFunction<AdStatOfDayDWD, CostHourDM, Long, GlobalWindow>.Context context,
|
|
|
- Iterable<AdStatOfDayDWD> iterable, Collector<CostHourDM> collector) throws Exception {
|
|
|
-
|
|
|
- List<AdStatOfDayDWD> adStatOfMinuteDWDlist = new ArrayList<>(24);
|
|
|
-
|
|
|
- for (AdStatOfDayDWD adStatOfMinuteDWD : iterable) {
|
|
|
- adStatOfMinuteDWDlist.add(adStatOfMinuteDWD);
|
|
|
- CostHourDM costHourDM = new CostHourDM();
|
|
|
-
|
|
|
- String tmpHour = new SimpleDateFormat("yyyy-MM-dd 24:00:00").format(adStatOfMinuteDWD.getCreateTime());
|
|
|
-
|
|
|
- String sql = "select " +
|
|
|
- "cost_speed," +
|
|
|
- "cost_hour ,\n" +
|
|
|
- "view_count_hour ," +
|
|
|
- "thousand_display_price_hour ,\n" +
|
|
|
- "valid_click_count_hour ," +
|
|
|
- "ctr_hour ,\n" +
|
|
|
- "cpc_hour ," +
|
|
|
- "conversions_count_hour ,\n" +
|
|
|
- "conversions_cost_hour ," +
|
|
|
- "conversions_rate_hour ,\n" +
|
|
|
- "first_day_order_roi_hour ," +
|
|
|
- "first_day_order_amount_hour ,\n" +
|
|
|
- "first_day_order_count_hour ," +
|
|
|
- "web_order_amount_hour ,\n" +
|
|
|
- "web_order_cost_hour ," +
|
|
|
- "web_order_rate_hour ,\n" +
|
|
|
- "web_order_count_hour ," +
|
|
|
- "order_roi_hour ,\n" +
|
|
|
- "order_unit_price_hour ," +
|
|
|
- "from_follow_uv_hour ,\n" +
|
|
|
- "from_follow_cost_hour ," +
|
|
|
- "from_follow_rate_hour ,\n" +
|
|
|
- "web_register_count_hour ," +
|
|
|
- "web_register_uv_hour ," +
|
|
|
- "web_register_cost_hour,\n" +
|
|
|
- "cost_last_hour ," +
|
|
|
- "cost_last_two_hour ," +
|
|
|
- "cost_last_hour_diff ," +
|
|
|
- "cost_last_three_trend " +
|
|
|
- "from data_monitoring.cost_hour\n" +
|
|
|
- "where ad_id ='" + adStatOfMinuteDWD.getAdId() + "' \n" +
|
|
|
- "and dt='" + adStatOfMinuteDWD.getStatDay() + "' and hour='" + tmpHour + "'";
|
|
|
-
|
|
|
-
|
|
|
-// log.error(sql);
|
|
|
- Statement statement = connection.createStatement();
|
|
|
- ResultSet rs = statement.executeQuery(sql);
|
|
|
- while (rs.next()) {
|
|
|
- costHourDM.setCostSpeed(rs.getDouble(1));
|
|
|
- costHourDM.setCostHour(rs.getLong(2));
|
|
|
- costHourDM.setViewCountHour(rs.getLong(3));
|
|
|
- costHourDM.setThousandDisplayPriceHour(rs.getLong(4));
|
|
|
- costHourDM.setValidClickCountHour(rs.getLong(5));
|
|
|
- costHourDM.setCtrHour(rs.getDouble(6));
|
|
|
- costHourDM.setCpcHour(rs.getLong(7));
|
|
|
- costHourDM.setConversionsCountHour(rs.getLong(8));
|
|
|
- costHourDM.setConversionsCostHour(rs.getLong(9));
|
|
|
- costHourDM.setConversionsRateHour(rs.getDouble(10));
|
|
|
- costHourDM.setFirstDayOrderRoiHour(rs.getDouble(11));
|
|
|
- costHourDM.setFirstDayOrderAmountHour(rs.getLong(12));
|
|
|
- costHourDM.setFirstDayOrderCountHour(rs.getLong(13));
|
|
|
- costHourDM.setWebOrderAmountHour(rs.getLong(14));
|
|
|
- costHourDM.setWebOrderCostHour(rs.getLong(15));
|
|
|
- costHourDM.setWebOrderRateHour(rs.getDouble(16));
|
|
|
- costHourDM.setWebOrderCountHour(rs.getLong(17));
|
|
|
- costHourDM.setOrderRoiHour(rs.getDouble(18));
|
|
|
- costHourDM.setOrderUnitPriceHour(rs.getLong(19));
|
|
|
- costHourDM.setFromFollowUvHour(rs.getLong(20));
|
|
|
- costHourDM.setFromFollowCostHour(rs.getLong(21));
|
|
|
- costHourDM.setFromFollowRateHour(rs.getDouble(22));
|
|
|
- costHourDM.setWebRegisterCountHour(rs.getLong(23));
|
|
|
- costHourDM.setWebRegisterUvHour(rs.getLong(24));
|
|
|
- costHourDM.setWebRegisterCostHour(rs.getDouble(25));
|
|
|
- costHourDM.setCostLastHour(rs.getLong(26));
|
|
|
- costHourDM.setCostLastTwoHour(rs.getLong(27));
|
|
|
- costHourDM.setCostLastHourDiff(rs.getLong(28));
|
|
|
- costHourDM.setCostLastThreeTrend(rs.getLong(29));
|
|
|
- }
|
|
|
- CostHourDM costHourDM_new = datachange(adStatOfMinuteDWD, costHourDM);
|
|
|
-
|
|
|
- collector.collect(costHourDM_new);
|
|
|
-// log.error("costhour_输出:" + JsonUtil.toString(costHourDM_new));
|
|
|
- }
|
|
|
-// log.error("costhour_windowCount:" + adStatOfMinuteDWDlist.size());
|
|
|
-
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void clear(ProcessWindowFunction<AdStatOfDayDWD, CostHourDM, Long, GlobalWindow>.Context context) throws Exception {
|
|
|
- }
|
|
|
-
|
|
|
-}
|