|
@@ -0,0 +1,142 @@
|
|
|
+package flink.zanxiangnet.ad.monitoring.sink;
|
|
|
+
|
|
|
+import com.aliyun.odps.Odps;
|
|
|
+import com.aliyun.odps.account.Account;
|
|
|
+import com.aliyun.odps.account.AliyunAccount;
|
|
|
+import com.aliyun.odps.data.Record;
|
|
|
+import com.aliyun.odps.tunnel.TableTunnel;
|
|
|
+import com.aliyun.odps.tunnel.TunnelException;
|
|
|
+import flink.zanxiangnet.ad.monitoring.maxcompute.MaxComputeLog;
|
|
|
+import flink.zanxiangnet.ad.monitoring.maxcompute.bean.BeanUtil;
|
|
|
+import flink.zanxiangnet.ad.monitoring.maxcompute.bean.annotation.MaxComputeTable;
|
|
|
+import flink.zanxiangnet.ad.monitoring.pojo.properties.ApplicationProperties;
|
|
|
+import org.apache.flink.configuration.Configuration;
|
|
|
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+import org.springframework.util.CollectionUtils;
|
|
|
+
|
|
|
+import java.io.IOException;
|
|
|
+import java.lang.reflect.InvocationTargetException;
|
|
|
+import java.lang.reflect.Method;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+
|
|
|
+/**
|
|
|
+ * 该类有严重 bug,请勿使用
|
|
|
+ *
|
|
|
+ * @param <IN>
|
|
|
+ */
|
|
|
+public class TunnelBatchStreamSink<T, IN extends List<T>> extends RichSinkFunction<IN> {
|
|
|
+ private static final Logger log = LoggerFactory.getLogger(TunnelBatchStreamSink.class);
|
|
|
+
|
|
|
+ // 对象锁,防止MaxCompute的 Tunnel对象多次初始化
|
|
|
+ private static final Object DUMMY_LOCK = new Object();
|
|
|
+
|
|
|
+ private final Class<T> clazz;
|
|
|
+ private String projectName;
|
|
|
+ private String tableName;
|
|
|
+
|
|
|
+ private volatile transient TableTunnel tunnel;
|
|
|
+ private volatile transient List<BeanUtil.FieldInfo> fieldInfoList;
|
|
|
+ private volatile transient Map<String, Method> partitionFieldMethods;
|
|
|
+
|
|
|
+ public TunnelBatchStreamSink(Class<T> clazz) {
|
|
|
+ this.clazz = clazz;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void open(Configuration config) {
|
|
|
+ if (tunnel == null) {
|
|
|
+ synchronized (DUMMY_LOCK) {
|
|
|
+ if (tunnel == null) {
|
|
|
+ Map<String, String> params = getRuntimeContext()
|
|
|
+ .getExecutionConfig()
|
|
|
+ .getGlobalJobParameters()
|
|
|
+ .toMap();
|
|
|
+ MaxComputeTable tableAnnotation = clazz.getAnnotation(MaxComputeTable.class);
|
|
|
+
|
|
|
+ Account account = new AliyunAccount(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_ID),
|
|
|
+ params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_KEY));
|
|
|
+ Odps odps = new Odps(account);
|
|
|
+ odps.getRestClient().setRetryLogger(new MaxComputeLog());
|
|
|
+ odps.setEndpoint(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_ENDPOINT));
|
|
|
+ odps.setDefaultProject(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_PROJECT_NAME));
|
|
|
+ tunnel = new TableTunnel(odps);
|
|
|
+ tunnel.setEndpoint(params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_TUNNEL_ENDPOINT));
|
|
|
+ projectName = params.get(ApplicationProperties.MAX_COMPUTE_ACCOUNT_PROJECT_NAME);
|
|
|
+ tableName = tableAnnotation.value();
|
|
|
+ fieldInfoList = BeanUtil.parseBeanField(clazz);
|
|
|
+ partitionFieldMethods = fieldInfoList.stream().filter(BeanUtil.FieldInfo::isUsePartitioned).collect(Collectors.toMap(BeanUtil.FieldInfo::getColumnName, BeanUtil.FieldInfo::getGetMethod));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 将值写入到 Sink。每个值都会调用此函数
|
|
|
+ *
|
|
|
+ * @param value
|
|
|
+ * @param context
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public void invoke(IN value, Context context) throws TunnelException, IOException, InvocationTargetException, IllegalAccessException {
|
|
|
+ T element = value.get(0);
|
|
|
+ String partitionStr = generatePartitionStr(element);
|
|
|
+ TableTunnel.StreamUploadSession uploadSession = tunnel.createStreamUploadSession(projectName, tableName, partitionStr);
|
|
|
+ TableTunnel.StreamRecordPack pack = uploadSession.newRecordPack();
|
|
|
+ for (T t : value) {
|
|
|
+ Record record = uploadSession.newRecord();
|
|
|
+ for (BeanUtil.FieldInfo fieldInfo : fieldInfoList) {
|
|
|
+ if (fieldInfo.isUsePartitioned()) {
|
|
|
+ // 分区字段不在这里设值
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ Object obj = fieldInfo.getGetMethod().invoke(t);
|
|
|
+ record.set(fieldInfo.getColumnName(), obj);
|
|
|
+ }
|
|
|
+ // append只是写入内存
|
|
|
+ pack.append(record);
|
|
|
+ }
|
|
|
+ System.out.println("写入数据==》" + value.size());
|
|
|
+ int retry = 0;
|
|
|
+ do {
|
|
|
+ try {
|
|
|
+ // 大概用时 100ms ~ 3s
|
|
|
+ pack.flush();
|
|
|
+ break;
|
|
|
+ } catch (IOException e) {
|
|
|
+ if (retry == 3) {
|
|
|
+ log.error("Flush data error!msg: " + e.getMessage());
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } while (retry++ < 3);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void close() throws Exception {
|
|
|
+ super.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ private String generatePartitionStr(T t) {
|
|
|
+ if (CollectionUtils.isEmpty(partitionFieldMethods)) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ StringBuilder partition = new StringBuilder();
|
|
|
+ for (Map.Entry<String, Method> entry : partitionFieldMethods.entrySet()) {
|
|
|
+ partition.append(entry.getKey()).append("=");
|
|
|
+ try {
|
|
|
+ partition.append(entry.getValue().invoke(t));
|
|
|
+ } catch (InvocationTargetException | IllegalAccessException e) {
|
|
|
+ // 获取分区字段的值失败
|
|
|
+ log.error(e.getMessage(), e);
|
|
|
+ throw new RuntimeException("Failed get partition field value!");
|
|
|
+ }
|
|
|
+ partition.append(",");
|
|
|
+ }
|
|
|
+ partition = new StringBuilder(partition.substring(0, partition.length() - 1));
|
|
|
+ return partition.toString();
|
|
|
+ }
|
|
|
+}
|