|
@@ -259,7 +259,7 @@ public class PlanStatJob {
|
|
private MapState<String, PlanStatOfDayDWD> historyReduceState;
|
|
private MapState<String, PlanStatOfDayDWD> historyReduceState;
|
|
|
|
|
|
@Override
|
|
@Override
|
|
- public void open(Configuration conf) {
|
|
|
|
|
|
+ public void open(Configuration conf) throws Exception {
|
|
Map<String, String> params = getRuntimeContext()
|
|
Map<String, String> params = getRuntimeContext()
|
|
.getExecutionConfig()
|
|
.getExecutionConfig()
|
|
.getGlobalJobParameters()
|
|
.getGlobalJobParameters()
|
|
@@ -278,6 +278,9 @@ public class PlanStatJob {
|
|
@Override
|
|
@Override
|
|
public void process(Long elementsCount, ProcessWindowFunction<AdDataOfDayODS, PlanStatOfDayDWD, Long, GlobalWindow>.Context context,
|
|
public void process(Long elementsCount, ProcessWindowFunction<AdDataOfDayODS, PlanStatOfDayDWD, Long, GlobalWindow>.Context context,
|
|
Iterable<AdDataOfDayODS> iterable, Collector<PlanStatOfDayDWD> collector) throws Exception {
|
|
Iterable<AdDataOfDayODS> iterable, Collector<PlanStatOfDayDWD> collector) throws Exception {
|
|
|
|
+ for (String key : historyReduceState.keys()) {
|
|
|
|
+ PlanStatOfDayDWD newStatData = historyReduceState.get(key);
|
|
|
|
+ }
|
|
AdDataOfDayODS element = iterable.iterator().next();
|
|
AdDataOfDayODS element = iterable.iterator().next();
|
|
LocalDate statDay = DateUtil.parseLocalDate(element.getStatDay());
|
|
LocalDate statDay = DateUtil.parseLocalDate(element.getStatDay());
|
|
long now = System.currentTimeMillis();
|
|
long now = System.currentTimeMillis();
|