|
@@ -8,10 +8,12 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
|
import com.zanxiang.erp.base.ErpServer;
|
|
|
import com.zanxiang.erp.base.pojo.vo.SysIpv4RpcVO;
|
|
|
import com.zanxiang.erp.base.rpc.ISysIpv4Rpc;
|
|
|
+import com.zanxiang.game.module.manage.constant.RedisKeyConstant;
|
|
|
import com.zanxiang.game.module.manage.service.IGameService;
|
|
|
import com.zanxiang.game.module.manage.service.IIpDataAssayService;
|
|
|
import com.zanxiang.game.module.manage.service.IUserLoginLogService;
|
|
|
import com.zanxiang.game.module.manage.service.IUserService;
|
|
|
+import com.zanxiang.game.module.manage.utils.RedisUtil;
|
|
|
import com.zanxiang.game.module.mybatis.entity.Game;
|
|
|
import com.zanxiang.game.module.mybatis.entity.User;
|
|
|
import com.zanxiang.game.module.mybatis.entity.UserLoginLog;
|
|
@@ -27,6 +29,7 @@ import java.time.LocalDateTime;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.List;
|
|
|
import java.util.Objects;
|
|
|
+import java.util.concurrent.CompletableFuture;
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
@@ -44,6 +47,9 @@ public class IpDataAssayServiceImpl implements IIpDataAssayService {
|
|
|
@DubboReference(providedBy = ErpServer.SERVER_DUBBO_NAME)
|
|
|
private ISysIpv4Rpc sysIpv4Rpc;
|
|
|
|
|
|
+ @Autowired
|
|
|
+ private RedisUtil<String> redisUtil;
|
|
|
+
|
|
|
@Autowired
|
|
|
private IGameService gameService;
|
|
|
|
|
@@ -62,6 +68,16 @@ public class IpDataAssayServiceImpl implements IIpDataAssayService {
|
|
|
new ThreadFactoryBuilder()
|
|
|
.setNameFormat("sync-ipDataAssay-%d").build());
|
|
|
|
|
|
+ private static final ThreadPoolExecutor THREAD_POOL_IP_DATA_ASSAY_QUEUE = new ThreadPoolExecutor(
|
|
|
+ 10,
|
|
|
+ 10,
|
|
|
+ 0,
|
|
|
+ TimeUnit.MINUTES,
|
|
|
+ new LinkedBlockingQueue<>(),
|
|
|
+ new ThreadFactoryBuilder()
|
|
|
+ .setNameFormat("sync-ipDataAssayQueue-%d").build());
|
|
|
+
|
|
|
+
|
|
|
@Override
|
|
|
public void userHandleByIds(String ids) {
|
|
|
List<Long> userIds = Arrays.stream(ids.split(",")).map(Long::valueOf).collect(Collectors.toList());
|
|
@@ -71,7 +87,7 @@ public class IpDataAssayServiceImpl implements IIpDataAssayService {
|
|
|
}
|
|
|
userList.forEach(user -> {
|
|
|
String ipData = this.ipAssay(user.getIp());
|
|
|
- log.error("IP解析返回的结果, ipData : {}", ipData);
|
|
|
+ log.error("用户IP解析返回的结果, ipData : {}", ipData);
|
|
|
if (Strings.isBlank(ipData)) {
|
|
|
return;
|
|
|
}
|
|
@@ -101,7 +117,9 @@ public class IpDataAssayServiceImpl implements IIpDataAssayService {
|
|
|
.in(User::getGameId, gameIdList)
|
|
|
.orderByDesc(User::getCreateTime));
|
|
|
totalPage = pageUser.getPages();
|
|
|
- userList = pageUser.getRecords();
|
|
|
+ userList = pageUser.getRecords().stream()
|
|
|
+ .filter(user -> Strings.isBlank(user.getIpData()))
|
|
|
+ .collect(Collectors.toList());
|
|
|
log.error("用户执行页, page : {}, totalPage : {}, size : {}", page, totalPage, userList.size());
|
|
|
this.pageUserHandle(userList);
|
|
|
} while (++page <= totalPage);
|
|
@@ -112,7 +130,7 @@ public class IpDataAssayServiceImpl implements IIpDataAssayService {
|
|
|
log.error("用户单页数据为空, 不处理");
|
|
|
return;
|
|
|
}
|
|
|
- THREAD_POOL_IP_DATA_ASSAY.execute(() -> userList.forEach(user -> {
|
|
|
+ List<CompletableFuture<?>> futures = userList.stream().map(user -> CompletableFuture.runAsync(() -> {
|
|
|
if (Strings.isNotBlank(user.getIpData())) {
|
|
|
return;
|
|
|
}
|
|
@@ -124,7 +142,31 @@ public class IpDataAssayServiceImpl implements IIpDataAssayService {
|
|
|
userService.update(new LambdaUpdateWrapper<User>()
|
|
|
.set(User::getIpData, ipData)
|
|
|
.eq(User::getId, user.getId()));
|
|
|
- }));
|
|
|
+ }, THREAD_POOL_IP_DATA_ASSAY)).collect(Collectors.toList());
|
|
|
+ try {
|
|
|
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("等待所有用户处理完成时出现异常: {}", e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void userLoginLogHandleByIds(String ids) {
|
|
|
+ List<Long> userLoginLogIds = Arrays.stream(ids.split(",")).map(Long::valueOf).collect(Collectors.toList());
|
|
|
+ List<UserLoginLog> userLoginLogList = userLoginLogService.listByIds(userLoginLogIds);
|
|
|
+ if (CollectionUtils.isEmpty(userLoginLogList)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ userLoginLogList.forEach(userLoginLog -> {
|
|
|
+ String ipData = this.ipAssay(userLoginLog.getIp());
|
|
|
+ log.error("登录日志IP解析返回的结果, ipData : {}", ipData);
|
|
|
+ if (Strings.isBlank(ipData)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ userLoginLogService.update(new LambdaUpdateWrapper<UserLoginLog>()
|
|
|
+ .set(UserLoginLog::getIpData, ipData)
|
|
|
+ .eq(UserLoginLog::getId, userLoginLog.getId()));
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -140,7 +182,9 @@ public class IpDataAssayServiceImpl implements IIpDataAssayService {
|
|
|
.ge(UserLoginLog::getCreateTime, LocalDateTime.now().minusMonths(2))
|
|
|
.orderByDesc(UserLoginLog::getCreateTime));
|
|
|
totalPage = pageUserLoginLog.getPages();
|
|
|
- userLoginLogList = pageUserLoginLog.getRecords();
|
|
|
+ userLoginLogList = pageUserLoginLog.getRecords().stream()
|
|
|
+ .filter(userLoginLog -> Strings.isBlank(userLoginLog.getIpData()))
|
|
|
+ .collect(Collectors.toList());
|
|
|
log.error("登录日志执行页, page : {}, totalPage : {}, size : {}", page, totalPage, userLoginLogList.size());
|
|
|
this.pageUserLoginLogHandle(userLoginLogList);
|
|
|
} while (++page <= totalPage);
|
|
@@ -151,7 +195,7 @@ public class IpDataAssayServiceImpl implements IIpDataAssayService {
|
|
|
log.error("登录日志单页数据为空, 不处理");
|
|
|
return;
|
|
|
}
|
|
|
- THREAD_POOL_IP_DATA_ASSAY.execute(() -> userLoginLogList.forEach(userLoginLog -> {
|
|
|
+ List<CompletableFuture<?>> futures = userLoginLogList.stream().map(userLoginLog -> CompletableFuture.runAsync(() -> {
|
|
|
if (Strings.isNotBlank(userLoginLog.getIpData())) {
|
|
|
return;
|
|
|
}
|
|
@@ -163,34 +207,65 @@ public class IpDataAssayServiceImpl implements IIpDataAssayService {
|
|
|
userLoginLogService.update(new LambdaUpdateWrapper<UserLoginLog>()
|
|
|
.set(UserLoginLog::getIpData, ipData)
|
|
|
.eq(UserLoginLog::getId, userLoginLog.getId()));
|
|
|
- }));
|
|
|
+ }, THREAD_POOL_IP_DATA_ASSAY)).collect(Collectors.toList());
|
|
|
+ try {
|
|
|
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("等待所有登录日志完成时出现异常: {}", e.getMessage());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void ipDataHandle(List<String> msgList) {
|
|
|
- msgList.forEach(msg -> {
|
|
|
+ List<CompletableFuture<?>> futures = msgList.stream().map(msg -> CompletableFuture.runAsync(() -> {
|
|
|
+ //数据解析, 消息类型(注册还是登录), 数据主键, ip地址
|
|
|
String[] array = msg.split(":");
|
|
|
- //消息类型(注册还是登录), 数据主键, ip地址
|
|
|
Tuple3<String, String, String> tuple3 = Tuple3.with(array[0], array[1], array[2]);
|
|
|
- String ipData = this.ipAssay(tuple3.third);
|
|
|
+ //IP信息
|
|
|
+ String ip = tuple3.third.contains("_") ? tuple3.third.split("_")[0] : tuple3.third;
|
|
|
+ //解析重试次数
|
|
|
+ int count = tuple3.third.contains("_") ? Integer.parseInt(tuple3.third.split("_")[1]) : 0;
|
|
|
+ //IP解析
|
|
|
+ String ipData = this.ipAssay(ip);
|
|
|
+ //解析失败业务处理
|
|
|
if (Strings.isBlank(ipData)) {
|
|
|
+ this.handleFailedIpAssay(msg, ip, count);
|
|
|
return;
|
|
|
}
|
|
|
- //注册用户, t_user表
|
|
|
- if (Objects.equals(tuple3.first, "REG")) {
|
|
|
- userService.update(new LambdaUpdateWrapper<User>()
|
|
|
- .set(User::getIpData, ipData)
|
|
|
- .eq(User::getId, Long.parseLong(tuple3.second))
|
|
|
- );
|
|
|
- }
|
|
|
- //登录日志, t_user_login_log表
|
|
|
- if (Objects.equals(tuple3.first, "LOGIN")) {
|
|
|
- userLoginLogService.update(new LambdaUpdateWrapper<UserLoginLog>()
|
|
|
- .set(UserLoginLog::getIpData, ipData)
|
|
|
- .eq(UserLoginLog::getId, Long.parseLong(tuple3.second))
|
|
|
- );
|
|
|
- }
|
|
|
- });
|
|
|
+ //解析成功业务处理
|
|
|
+ this.handleSuccessIpAssay(tuple3.first, Long.parseLong(tuple3.second), ipData);
|
|
|
+ }, THREAD_POOL_IP_DATA_ASSAY_QUEUE)).collect(Collectors.toList());
|
|
|
+ try {
|
|
|
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("等待所有队列消息消费完成时出现异常: {}", e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void handleFailedIpAssay(String msg, String ip, int count) {
|
|
|
+ if (++count < 50) {
|
|
|
+ log.error("队列消息IP解析失败, 重新添加到队列, ip : {}, count : {}", ip, count);
|
|
|
+ redisUtil.addToSet(RedisKeyConstant.IP_DATA_ASSAY_QUEUE, msg + "_" + count);
|
|
|
+ } else {
|
|
|
+ log.error("队列消息IP解析失败, 重试次数达到上限, ip : {}, count : {}", ip, count);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void handleSuccessIpAssay(String type, Long id, String ipData) {
|
|
|
+ //注册用户, t_user表
|
|
|
+ if (Objects.equals(type, "REG")) {
|
|
|
+ userService.update(new LambdaUpdateWrapper<User>()
|
|
|
+ .set(User::getIpData, ipData)
|
|
|
+ .eq(User::getId, id)
|
|
|
+ );
|
|
|
+ }
|
|
|
+ //登录日志, t_user_login_log表
|
|
|
+ if (Objects.equals(type, "LOGIN")) {
|
|
|
+ userLoginLogService.update(new LambdaUpdateWrapper<UserLoginLog>()
|
|
|
+ .set(UserLoginLog::getIpData, ipData)
|
|
|
+ .eq(UserLoginLog::getId, id)
|
|
|
+ );
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private String ipAssay(String ip) {
|