|
@@ -20,6 +20,8 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource;
|
|
|
import org.apache.flink.streaming.api.environment.CheckpointConfig;
|
|
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
|
|
|
|
|
+import java.math.BigDecimal;
|
|
|
+import java.math.RoundingMode;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.List;
|
|
|
import java.util.Properties;
|
|
@@ -129,6 +131,16 @@ public class ByteDailyCost {
|
|
|
|
|
|
after.entrySet().removeIf(next -> !columns.contains(next.getKey()));
|
|
|
|
|
|
+ BigDecimal cost = after.getBigDecimal("cost");
|
|
|
+
|
|
|
+// BigDecimal divide = cost.divide(new BigDecimal(100), 6, RoundingMode.HALF_UP);
|
|
|
+
|
|
|
+ BigDecimal multiply = cost.multiply(new BigDecimal(100));
|
|
|
+
|
|
|
+ after.remove("cost");
|
|
|
+
|
|
|
+ after.put("cost",multiply);
|
|
|
+
|
|
|
keyFinal = key.toString().replaceAll("\"day\":", "\"date\":");
|
|
|
|
|
|
afterFinal = after.toString().replaceAll("\"day\":", "\"date\":")
|
|
@@ -136,7 +148,6 @@ public class ByteDailyCost {
|
|
|
.replaceAll("\"click\":", "\"valid_click_count\":")
|
|
|
.replaceAll("\"active\":", "\"from_follow_uv\":");
|
|
|
|
|
|
-
|
|
|
tableNameFinal = "daily_tt";
|
|
|
}
|
|
|
|