|
@@ -27,6 +27,7 @@ import java.time.LocalDateTime;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.List;
|
|
|
import java.util.Objects;
|
|
|
+import java.util.concurrent.CompletableFuture;
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
@@ -62,6 +63,16 @@ public class IpDataAssayServiceImpl implements IIpDataAssayService {
|
|
|
new ThreadFactoryBuilder()
|
|
|
.setNameFormat("sync-ipDataAssay-%d").build());
|
|
|
|
|
|
+ private static final ThreadPoolExecutor THREAD_POOL_IP_DATA_ASSAY_QUEUE = new ThreadPoolExecutor(
|
|
|
+ 10,
|
|
|
+ 10,
|
|
|
+ 0,
|
|
|
+ TimeUnit.MINUTES,
|
|
|
+ new LinkedBlockingQueue<>(),
|
|
|
+ new ThreadFactoryBuilder()
|
|
|
+ .setNameFormat("sync-ipDataAssayQueue-%d").build());
|
|
|
+
|
|
|
+
|
|
|
@Override
|
|
|
public void userHandleByIds(String ids) {
|
|
|
List<Long> userIds = Arrays.stream(ids.split(",")).map(Long::valueOf).collect(Collectors.toList());
|
|
@@ -71,7 +82,7 @@ public class IpDataAssayServiceImpl implements IIpDataAssayService {
|
|
|
}
|
|
|
userList.forEach(user -> {
|
|
|
String ipData = this.ipAssay(user.getIp());
|
|
|
- log.error("IP解析返回的结果, ipData : {}", ipData);
|
|
|
+ log.error("用户IP解析返回的结果, ipData : {}", ipData);
|
|
|
if (Strings.isBlank(ipData)) {
|
|
|
return;
|
|
|
}
|
|
@@ -112,7 +123,7 @@ public class IpDataAssayServiceImpl implements IIpDataAssayService {
|
|
|
log.error("用户单页数据为空, 不处理");
|
|
|
return;
|
|
|
}
|
|
|
- THREAD_POOL_IP_DATA_ASSAY.execute(() -> userList.forEach(user -> {
|
|
|
+ List<CompletableFuture<?>> futures = userList.stream().map(user -> CompletableFuture.runAsync(() -> {
|
|
|
if (Strings.isNotBlank(user.getIpData())) {
|
|
|
return;
|
|
|
}
|
|
@@ -124,7 +135,31 @@ public class IpDataAssayServiceImpl implements IIpDataAssayService {
|
|
|
userService.update(new LambdaUpdateWrapper<User>()
|
|
|
.set(User::getIpData, ipData)
|
|
|
.eq(User::getId, user.getId()));
|
|
|
- }));
|
|
|
+ }, THREAD_POOL_IP_DATA_ASSAY)).collect(Collectors.toList());
|
|
|
+ try {
|
|
|
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("等待所有用户处理完成时出现异常: {}", e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void userLoginLogHandleByIds(String ids) {
|
|
|
+ List<Long> userLoginLogIds = Arrays.stream(ids.split(",")).map(Long::valueOf).collect(Collectors.toList());
|
|
|
+ List<UserLoginLog> userLoginLogList = userLoginLogService.listByIds(userLoginLogIds);
|
|
|
+ if (CollectionUtils.isEmpty(userLoginLogList)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ userLoginLogList.forEach(userLoginLog -> {
|
|
|
+ String ipData = this.ipAssay(userLoginLog.getIp());
|
|
|
+ log.error("登录日志IP解析返回的结果, ipData : {}", ipData);
|
|
|
+ if (Strings.isBlank(ipData)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ userLoginLogService.update(new LambdaUpdateWrapper<UserLoginLog>()
|
|
|
+ .set(UserLoginLog::getIpData, ipData)
|
|
|
+ .eq(UserLoginLog::getId, userLoginLog.getId()));
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -151,7 +186,7 @@ public class IpDataAssayServiceImpl implements IIpDataAssayService {
|
|
|
log.error("登录日志单页数据为空, 不处理");
|
|
|
return;
|
|
|
}
|
|
|
- THREAD_POOL_IP_DATA_ASSAY.execute(() -> userLoginLogList.forEach(userLoginLog -> {
|
|
|
+ List<CompletableFuture<?>> futures = userLoginLogList.stream().map(userLoginLog -> CompletableFuture.runAsync(() -> {
|
|
|
if (Strings.isNotBlank(userLoginLog.getIpData())) {
|
|
|
return;
|
|
|
}
|
|
@@ -163,17 +198,23 @@ public class IpDataAssayServiceImpl implements IIpDataAssayService {
|
|
|
userLoginLogService.update(new LambdaUpdateWrapper<UserLoginLog>()
|
|
|
.set(UserLoginLog::getIpData, ipData)
|
|
|
.eq(UserLoginLog::getId, userLoginLog.getId()));
|
|
|
- }));
|
|
|
+ }, THREAD_POOL_IP_DATA_ASSAY)).collect(Collectors.toList());
|
|
|
+ try {
|
|
|
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("等待所有登录日志完成时出现异常: {}", e.getMessage());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void ipDataHandle(List<String> msgList) {
|
|
|
- msgList.forEach(msg -> {
|
|
|
+ List<CompletableFuture<?>> futures = msgList.stream().map(msg -> CompletableFuture.runAsync(() -> {
|
|
|
+ //数据解析, 消息类型(注册还是登录), 数据主键, ip地址
|
|
|
String[] array = msg.split(":");
|
|
|
- //消息类型(注册还是登录), 数据主键, ip地址
|
|
|
Tuple3<String, String, String> tuple3 = Tuple3.with(array[0], array[1], array[2]);
|
|
|
String ipData = this.ipAssay(tuple3.third);
|
|
|
if (Strings.isBlank(ipData)) {
|
|
|
+ log.error("队列消息IP解析结果返回为空, ip : {}", tuple3.third);
|
|
|
return;
|
|
|
}
|
|
|
//注册用户, t_user表
|
|
@@ -190,7 +231,12 @@ public class IpDataAssayServiceImpl implements IIpDataAssayService {
|
|
|
.eq(UserLoginLog::getId, Long.parseLong(tuple3.second))
|
|
|
);
|
|
|
}
|
|
|
- });
|
|
|
+ }, THREAD_POOL_IP_DATA_ASSAY_QUEUE)).collect(Collectors.toList());
|
|
|
+ try {
|
|
|
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("等待所有队列消息消费完成时出现异常: {}", e.getMessage());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private String ipAssay(String ip) {
|