|
@@ -0,0 +1,197 @@
|
|
|
|
+package flink.zanxiangnet.ad.monitoring.process;
|
|
|
|
+
|
|
|
|
+import flink.zanxiangnet.ad.monitoring.pojo.entity.AdStatOfHourDWD;
|
|
|
|
+import flink.zanxiangnet.ad.monitoring.pojo.entity.PlanStatOfHourDWD;
|
|
|
|
+import flink.zanxiangnet.ad.monitoring.util.DateUtil;
|
|
|
|
+import org.apache.flink.api.common.state.ValueState;
|
|
|
|
+import org.apache.flink.api.common.state.ValueStateDescriptor;
|
|
|
|
+import org.apache.flink.api.common.typeinfo.Types;
|
|
|
|
+import org.apache.flink.configuration.Configuration;
|
|
|
|
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
|
|
|
|
+import org.apache.flink.util.Collector;
|
|
|
|
+import org.springframework.beans.BeanUtils;
|
|
|
|
+
|
|
|
|
+import java.time.LocalDateTime;
|
|
|
|
+import java.time.LocalTime;
|
|
|
|
+import java.util.Date;
|
|
|
|
+
|
|
|
|
+/**
|
|
|
|
+ * 小时流数据补全,把中间没消耗的时间段填充 0
|
|
|
|
+ */
|
|
|
|
+public class PlanHourStreamCompletionProcess extends KeyedProcessFunction<Long, PlanStatOfHourDWD, PlanStatOfHourDWD> {
|
|
|
|
+
|
|
|
|
+ private ValueState<PlanStatOfHourDWD> lastReduceState;
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void open(Configuration conf) {
|
|
|
|
+ lastReduceState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastReduceState", Types.POJO(PlanStatOfHourDWD.class)));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void processElement(PlanStatOfHourDWD planStatOfHourDWD, KeyedProcessFunction<Long, PlanStatOfHourDWD, PlanStatOfHourDWD>.Context context,
|
|
|
|
+ Collector<PlanStatOfHourDWD> collector) throws Exception {
|
|
|
|
+ PlanStatOfHourDWD lastReduce = lastReduceState.value();
|
|
|
|
+ if (lastReduce == null) {
|
|
|
|
+ lastReduceState.update(planStatOfHourDWD);
|
|
|
|
+ collector.collect(planStatOfHourDWD);
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ LocalDateTime statDateTime = LocalDateTime.of(DateUtil.parseLocalDate(planStatOfHourDWD.getStatDay()), LocalTime.of(planStatOfHourDWD.getHour(), 0, 0));
|
|
|
|
+ LocalDateTime lastStatDateTime = LocalDateTime.of(DateUtil.parseLocalDate(lastReduce.getStatDay()), LocalTime.of(lastReduce.getHour(), 0, 0));
|
|
|
|
+ if (lastStatDateTime.compareTo(statDateTime) >= 0) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ long hours = DateUtil.intervalOfHour(lastStatDateTime, statDateTime);
|
|
|
|
+ if (hours > 1) {
|
|
|
|
+ // 中间有没数据的时间段,需要进行数据填充
|
|
|
|
+ for (int i = 1; i < hours; i++) {
|
|
|
|
+ // 要填充的时间
|
|
|
|
+ LocalDateTime completionTime = lastStatDateTime.plusHours(i);
|
|
|
|
+ collector.collect(completion(completionTime, lastReduce));
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ collector.collect(planStatOfHourDWD);
|
|
|
|
+ lastReduceState.update(planStatOfHourDWD);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private PlanStatOfHourDWD completion(LocalDateTime completionTime, PlanStatOfHourDWD lastReduceData) {
|
|
|
|
+ String statDay = DateUtil.formatLocalDate(completionTime.toLocalDate());
|
|
|
|
+ int hour = completionTime.getHour();
|
|
|
|
+ PlanStatOfHourDWD result = new PlanStatOfHourDWD();
|
|
|
|
+ BeanUtils.copyProperties(lastReduceData, result);
|
|
|
|
+ result.setCreateTime(new Date());
|
|
|
|
+ result.setStatDay(statDay);
|
|
|
|
+ result.setHour(hour);
|
|
|
|
+ PlanStatOfHourDWD.initValue(result);
|
|
|
|
+
|
|
|
|
+ result.setCostDeviationRateTotal(lastReduceData.getCostDeviationRateTotal());
|
|
|
|
+ result.setCostTotal(lastReduceData.getCostTotal());
|
|
|
|
+ result.setCompensationAmountTotal(lastReduceData.getCompensationAmountTotal());
|
|
|
|
+ result.setViewCountTotal(lastReduceData.getViewCountTotal());
|
|
|
|
+ result.setThousandDisplayPriceAll(lastReduceData.getThousandDisplayPriceAll());
|
|
|
|
+ result.setValidClickCountTotal(lastReduceData.getValidClickCountTotal());
|
|
|
|
+ result.setCtrAll(lastReduceData.getCtrAll());
|
|
|
|
+ result.setCpcAll(lastReduceData.getCpcAll());
|
|
|
|
+ result.setValuableClickCountTotal(lastReduceData.getValuableClickCountTotal());
|
|
|
|
+ result.setValuableClickRateAll(lastReduceData.getValuableClickRateAll());
|
|
|
|
+ result.setValuableClickCostAll(lastReduceData.getValuableClickCostAll());
|
|
|
|
+ result.setConversionsCountTotal(lastReduceData.getConversionsCountTotal());
|
|
|
|
+ result.setConversionsCostAll(lastReduceData.getConversionsCostAll());
|
|
|
|
+ result.setConversionsRateAll(lastReduceData.getConversionsRateAll());
|
|
|
|
+ result.setDeepConversionsCountTotal(lastReduceData.getDeepConversionsCountTotal());
|
|
|
|
+ result.setDeepConversionsCostAll(lastReduceData.getDeepConversionsCostAll());
|
|
|
|
+ result.setDeepConversionsRateAll(lastReduceData.getDeepConversionsRateAll());
|
|
|
|
+ result.setOrderCountTotal(lastReduceData.getOrderCountTotal());
|
|
|
|
+ result.setFirstDayOrderCountTotal(lastReduceData.getFirstDayOrderCountTotal());
|
|
|
|
+ result.setWebOrderCostAll(lastReduceData.getWebOrderCostAll());
|
|
|
|
+ result.setOrderRateAll(lastReduceData.getOrderRateAll());
|
|
|
|
+ result.setOrderAmountTotal(lastReduceData.getOrderAmountTotal());
|
|
|
|
+ result.setFirstDayOrderAmountTotal(lastReduceData.getFirstDayOrderAmountTotal());
|
|
|
|
+ result.setOrderUnitPriceAll(lastReduceData.getOrderUnitPriceAll());
|
|
|
|
+ result.setOrderRoiAll(lastReduceData.getOrderRoiAll());
|
|
|
|
+ result.setSignInCountTotal(lastReduceData.getSignInCountTotal());
|
|
|
|
+ result.setScanFollowCountTotal(lastReduceData.getScanFollowCountTotal());
|
|
|
|
+ result.setWechatAppRegisterUvTotal(lastReduceData.getWechatAppRegisterUvTotal());
|
|
|
|
+ result.setWechatMinigameRegisterCostAll(lastReduceData.getWechatMinigameRegisterCostAll());
|
|
|
|
+ result.setWechatMinigameRegisterRateAll(lastReduceData.getWechatMinigameRegisterRateAll());
|
|
|
|
+ result.setWechatMinigameArpuAll(lastReduceData.getWechatMinigameArpuAll());
|
|
|
|
+ result.setWechatMinigameRetentionCountTotal(lastReduceData.getWechatMinigameRetentionCountTotal());
|
|
|
|
+ result.setWechatMinigameCheckoutCountTotal(lastReduceData.getWechatMinigameCheckoutCountTotal());
|
|
|
|
+ result.setWechatMinigameCheckoutAmountTotal(lastReduceData.getWechatMinigameCheckoutAmountTotal());
|
|
|
|
+ result.setOfficialAccountFollowCountTotal(lastReduceData.getOfficialAccountFollowCountTotal());
|
|
|
|
+ result.setOfficialAccountFollowRateAll(lastReduceData.getOfficialAccountFollowRateAll());
|
|
|
|
+ result.setOfficialAccountRegisterUserCountTotal(lastReduceData.getOfficialAccountRegisterUserCountTotal());
|
|
|
|
+ result.setOfficialAccountRegisterRateAll(lastReduceData.getOfficialAccountRegisterRateAll());
|
|
|
|
+ result.setOfficialAccountRegisterCostAll(lastReduceData.getOfficialAccountRegisterCostAll());
|
|
|
|
+ result.setOfficialAccountRegisterAmountTotal(lastReduceData.getOfficialAccountRegisterAmountTotal());
|
|
|
|
+ result.setOfficialAccountRegisterRoiAll(lastReduceData.getOfficialAccountRegisterRoiAll());
|
|
|
|
+ result.setOfficialAccountApplyCountTotal(lastReduceData.getOfficialAccountApplyCountTotal());
|
|
|
|
+ result.setOfficialAccountApplyUserCountTotal(lastReduceData.getOfficialAccountApplyUserCountTotal());
|
|
|
|
+ result.setOfficialAccountApplyRateAll(lastReduceData.getOfficialAccountApplyRateAll());
|
|
|
|
+ result.setOfficialAccountApplyCostAll(lastReduceData.getOfficialAccountApplyCostAll());
|
|
|
|
+ result.setOfficialAccountApplyAmountTotal(lastReduceData.getOfficialAccountApplyAmountTotal());
|
|
|
|
+ result.setOfficialAccountApplyRoiAll(lastReduceData.getOfficialAccountApplyRoiAll());
|
|
|
|
+ result.setOfficialAccountOrderCountTotal(lastReduceData.getOfficialAccountOrderCountTotal());
|
|
|
|
+ result.setOfficialAccountFirstDayOrderCountTotal(lastReduceData.getOfficialAccountFirstDayOrderCountTotal());
|
|
|
|
+ result.setOfficialAccountOrderUserCountTotal(lastReduceData.getOfficialAccountOrderUserCountTotal());
|
|
|
|
+ result.setOfficialAccountOrderRateAll(lastReduceData.getOfficialAccountOrderRateAll());
|
|
|
|
+ result.setOfficialAccountOrderCostAll(lastReduceData.getOfficialAccountOrderCostAll());
|
|
|
|
+ result.setOfficialAccountOrderAmountTotal(lastReduceData.getOfficialAccountOrderAmountTotal());
|
|
|
|
+ result.setOfficialAccountFirstDayOrderAmountTotal(lastReduceData.getOfficialAccountFirstDayOrderAmountTotal());
|
|
|
|
+ result.setOfficialAccountOrderRoiAll(lastReduceData.getOfficialAccountOrderRoiAll());
|
|
|
|
+ result.setOfficialAccountConsultCountTotal(lastReduceData.getOfficialAccountConsultCountTotal());
|
|
|
|
+ result.setOfficialAccountReaderCountTotal(lastReduceData.getOfficialAccountReaderCountTotal());
|
|
|
|
+ result.setOfficialAccountCreditApplyUserCountTotal(lastReduceData.getOfficialAccountCreditApplyUserCountTotal());
|
|
|
|
+ result.setOfficialAccountCreditUserCountTotal(lastReduceData.getOfficialAccountCreditUserCountTotal());
|
|
|
|
+ result.setForwardCountTotal(lastReduceData.getForwardCountTotal());
|
|
|
|
+ result.setForwardUserCountTotal(lastReduceData.getForwardUserCountTotal());
|
|
|
|
+ result.setNoInterestCountTotal(lastReduceData.getNoInterestCountTotal());
|
|
|
|
+ result.setNoInterestCountHour(lastReduceData.getNoInterestCountHour());
|
|
|
|
+
|
|
|
|
+ if (statDay.equals(lastReduceData.getStatDay())) {
|
|
|
|
+ result.setCostDeviationRateDay(lastReduceData.getCostDeviationRateDay());
|
|
|
|
+ result.setCostDay(lastReduceData.getCostDay());
|
|
|
|
+ result.setCompensationAmountDay(lastReduceData.getCompensationAmountDay());
|
|
|
|
+ result.setViewCountDay(lastReduceData.getViewCountDay());
|
|
|
|
+ result.setThousandDisplayPriceDay(lastReduceData.getThousandDisplayPriceDay());
|
|
|
|
+ result.setValidClickCountDay(lastReduceData.getValidClickCountDay());
|
|
|
|
+ result.setCtrDay(lastReduceData.getCtrDay());
|
|
|
|
+ result.setCpcDay(lastReduceData.getCpcDay());
|
|
|
|
+ result.setValuableClickCountDay(lastReduceData.getValuableClickCountDay());
|
|
|
|
+ result.setValuableClickRateDay(lastReduceData.getValuableClickRateDay());
|
|
|
|
+ result.setValuableClickCostDay(lastReduceData.getValuableClickCostDay());
|
|
|
|
+ result.setConversionsCountDay(lastReduceData.getConversionsCountDay());
|
|
|
|
+ result.setConversionsCostDay(lastReduceData.getConversionsCostDay());
|
|
|
|
+ result.setConversionsRateDay(lastReduceData.getConversionsRateDay());
|
|
|
|
+ result.setDeepConversionsCountDay(lastReduceData.getDeepConversionsCountDay());
|
|
|
|
+ result.setDeepConversionsCostDay(lastReduceData.getDeepConversionsCostDay());
|
|
|
|
+ result.setDeepConversionsRateDay(lastReduceData.getDeepConversionsRateDay());
|
|
|
|
+ result.setOrderCountDay(lastReduceData.getOrderCountDay());
|
|
|
|
+ result.setFirstDayOrderCountDay(lastReduceData.getFirstDayOrderCountDay());
|
|
|
|
+ result.setWebOrderCostDay(lastReduceData.getWebOrderCostDay());
|
|
|
|
+ result.setOrderRateDay(lastReduceData.getOrderRateDay());
|
|
|
|
+ result.setOrderAmountDay(lastReduceData.getOrderAmountDay());
|
|
|
|
+ result.setFirstDayOrderAmountDay(lastReduceData.getFirstDayOrderAmountDay());
|
|
|
|
+ result.setOrderUnitPriceDay(lastReduceData.getOrderUnitPriceDay());
|
|
|
|
+ result.setOrderRoiDay(lastReduceData.getOrderRoiDay());
|
|
|
|
+ result.setSignInCountDay(lastReduceData.getSignInCountDay());
|
|
|
|
+ result.setScanFollowCountDay(lastReduceData.getScanFollowCountDay());
|
|
|
|
+ result.setWechatAppRegisterUvDay(lastReduceData.getWechatAppRegisterUvDay());
|
|
|
|
+ result.setWechatMinigameRegisterCostDay(lastReduceData.getWechatMinigameRegisterCostDay());
|
|
|
|
+ result.setWechatMinigameRegisterRateDay(lastReduceData.getWechatMinigameRegisterRateDay());
|
|
|
|
+ result.setWechatMinigameArpuDay(lastReduceData.getWechatMinigameArpuDay());
|
|
|
|
+ result.setWechatMinigameRetentionCountDay(lastReduceData.getWechatMinigameRetentionCountDay());
|
|
|
|
+ result.setWechatMinigameCheckoutCountDay(lastReduceData.getWechatMinigameCheckoutCountDay());
|
|
|
|
+ result.setWechatMinigameCheckoutAmountDay(lastReduceData.getWechatMinigameCheckoutAmountDay());
|
|
|
|
+ result.setOfficialAccountFollowCountDay(lastReduceData.getOfficialAccountFollowCountDay());
|
|
|
|
+ result.setOfficialAccountFollowRateDay(lastReduceData.getOfficialAccountFollowRateDay());
|
|
|
|
+ result.setOfficialAccountRegisterUserCountDay(lastReduceData.getOfficialAccountRegisterUserCountDay());
|
|
|
|
+ result.setOfficialAccountRegisterRateDay(lastReduceData.getOfficialAccountRegisterRateDay());
|
|
|
|
+ result.setOfficialAccountRegisterCostDay(lastReduceData.getOfficialAccountRegisterCostDay());
|
|
|
|
+ result.setOfficialAccountRegisterAmountDay(lastReduceData.getOfficialAccountRegisterAmountDay());
|
|
|
|
+ result.setOfficialAccountRegisterRoiDay(lastReduceData.getOfficialAccountRegisterRoiDay());
|
|
|
|
+ result.setOfficialAccountApplyCountDay(lastReduceData.getOfficialAccountApplyCountDay());
|
|
|
|
+ result.setOfficialAccountApplyUserCountDay(lastReduceData.getOfficialAccountApplyUserCountDay());
|
|
|
|
+ result.setOfficialAccountApplyRateDay(lastReduceData.getOfficialAccountApplyRateDay());
|
|
|
|
+ result.setOfficialAccountApplyCostDay(lastReduceData.getOfficialAccountApplyCostDay());
|
|
|
|
+ result.setOfficialAccountApplyAmountDay(lastReduceData.getOfficialAccountApplyAmountDay());
|
|
|
|
+ result.setOfficialAccountApplyRoiDay(lastReduceData.getOfficialAccountApplyRoiDay());
|
|
|
|
+ result.setOfficialAccountOrderCountDay(lastReduceData.getOfficialAccountOrderCountDay());
|
|
|
|
+ result.setOfficialAccountFirstDayOrderCountDay(lastReduceData.getOfficialAccountFirstDayOrderCountDay());
|
|
|
|
+ result.setOfficialAccountOrderUserCountDay(lastReduceData.getOfficialAccountOrderUserCountDay());
|
|
|
|
+ result.setOfficialAccountOrderRateDay(lastReduceData.getOfficialAccountOrderRateDay());
|
|
|
|
+ result.setOfficialAccountOrderCostDay(lastReduceData.getOfficialAccountOrderCostDay());
|
|
|
|
+ result.setOfficialAccountOrderAmountDay(lastReduceData.getOfficialAccountOrderAmountDay());
|
|
|
|
+ result.setOfficialAccountFirstDayOrderAmountDay(lastReduceData.getOfficialAccountFirstDayOrderAmountDay());
|
|
|
|
+ result.setOfficialAccountOrderRoiDay(lastReduceData.getOfficialAccountOrderRoiDay());
|
|
|
|
+ result.setOfficialAccountConsultCountDay(lastReduceData.getOfficialAccountConsultCountDay());
|
|
|
|
+ result.setOfficialAccountReaderCountDay(lastReduceData.getOfficialAccountReaderCountDay());
|
|
|
|
+ result.setOfficialAccountCreditApplyUserCountDay(lastReduceData.getOfficialAccountCreditApplyUserCountDay());
|
|
|
|
+ result.setOfficialAccountCreditUserCountDay(lastReduceData.getOfficialAccountCreditUserCountDay());
|
|
|
|
+ result.setForwardCountDay(lastReduceData.getForwardCountDay());
|
|
|
|
+ result.setForwardUserCountDay(lastReduceData.getForwardUserCountDay());
|
|
|
|
+ result.setNoInterestCountDay(lastReduceData.getNoInterestCountDay());
|
|
|
|
+ }
|
|
|
|
+ return result;
|
|
|
|
+ }
|
|
|
|
+}
|