Sfoglia il codice sorgente

fix : IP解析历史数据处理新增线程池调试02

bilingfeng 3 mesi fa
parent
commit
d15cf8e54d

+ 43 - 16
game-module/game-module-manage/src/main/java/com/zanxiang/game/module/manage/service/impl/IpDataAssayServiceImpl.java

@@ -8,10 +8,12 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.zanxiang.erp.base.ErpServer;
 import com.zanxiang.erp.base.pojo.vo.SysIpv4RpcVO;
 import com.zanxiang.erp.base.rpc.ISysIpv4Rpc;
+import com.zanxiang.game.module.manage.constant.RedisKeyConstant;
 import com.zanxiang.game.module.manage.service.IGameService;
 import com.zanxiang.game.module.manage.service.IIpDataAssayService;
 import com.zanxiang.game.module.manage.service.IUserLoginLogService;
 import com.zanxiang.game.module.manage.service.IUserService;
+import com.zanxiang.game.module.manage.utils.RedisUtil;
 import com.zanxiang.game.module.mybatis.entity.Game;
 import com.zanxiang.game.module.mybatis.entity.User;
 import com.zanxiang.game.module.mybatis.entity.UserLoginLog;
@@ -45,6 +47,9 @@ public class IpDataAssayServiceImpl implements IIpDataAssayService {
     @DubboReference(providedBy = ErpServer.SERVER_DUBBO_NAME)
     private ISysIpv4Rpc sysIpv4Rpc;
 
+    @Autowired
+    private RedisUtil<String> redisUtil;
+
     @Autowired
     private IGameService gameService;
 
@@ -110,6 +115,7 @@ public class IpDataAssayServiceImpl implements IIpDataAssayService {
             Page<User> pageUser = userService.page(new Page<>(page, pageSize), new LambdaQueryWrapper<User>()
                     .select(User::getId, User::getIp, User::getIpData)
                     .in(User::getGameId, gameIdList)
+                    .isNull(User::getIpData)
                     .orderByDesc(User::getCreateTime));
             totalPage = pageUser.getPages();
             userList = pageUser.getRecords();
@@ -173,6 +179,7 @@ public class IpDataAssayServiceImpl implements IIpDataAssayService {
                     new LambdaQueryWrapper<UserLoginLog>()
                             .select(UserLoginLog::getId, UserLoginLog::getIp, UserLoginLog::getIpData)
                             .ge(UserLoginLog::getCreateTime, LocalDateTime.now().minusMonths(2))
+                            .isNull(UserLoginLog::getIpData)
                             .orderByDesc(UserLoginLog::getCreateTime));
             totalPage = pageUserLoginLog.getPages();
             userLoginLogList = pageUserLoginLog.getRecords();
@@ -212,25 +219,19 @@ public class IpDataAssayServiceImpl implements IIpDataAssayService {
             //数据解析, 消息类型(注册还是登录), 数据主键, ip地址
             String[] array = msg.split(":");
             Tuple3<String, String, String> tuple3 = Tuple3.with(array[0], array[1], array[2]);
-            String ipData = this.ipAssay(tuple3.third);
+            //IP信息
+            String ip = tuple3.third.contains("_") ? tuple3.third.split("_")[0] : tuple3.third;
+            //解析重试次数
+            int count = tuple3.third.contains("_") ? Integer.parseInt(tuple3.third.split("_")[1]) : 0;
+            //IP解析
+            String ipData = this.ipAssay(ip);
+            //解析失败业务处理
             if (Strings.isBlank(ipData)) {
-                log.error("队列消息IP解析结果返回为空, ip : {}", tuple3.third);
+                this.handleFailedIpAssay(msg, ip, count);
                 return;
             }
-            //注册用户, t_user表
-            if (Objects.equals(tuple3.first, "REG")) {
-                userService.update(new LambdaUpdateWrapper<User>()
-                        .set(User::getIpData, ipData)
-                        .eq(User::getId, Long.parseLong(tuple3.second))
-                );
-            }
-            //登录日志, t_user_login_log表
-            if (Objects.equals(tuple3.first, "LOGIN")) {
-                userLoginLogService.update(new LambdaUpdateWrapper<UserLoginLog>()
-                        .set(UserLoginLog::getIpData, ipData)
-                        .eq(UserLoginLog::getId, Long.parseLong(tuple3.second))
-                );
-            }
+            //解析成功业务处理
+            this.handleSuccessIpAssay(tuple3.first, Long.parseLong(tuple3.second), ipData);
         }, THREAD_POOL_IP_DATA_ASSAY_QUEUE)).collect(Collectors.toList());
         try {
             CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
@@ -239,6 +240,32 @@ public class IpDataAssayServiceImpl implements IIpDataAssayService {
         }
     }
 
+    private void handleFailedIpAssay(String msg, String ip, int count) {
+        if (++count < 50) {
+            log.error("队列消息IP解析失败, 重新添加到队列, ip : {}, count : {}", ip, count);
+            redisUtil.addToSet(RedisKeyConstant.IP_DATA_ASSAY_QUEUE, msg + "_" + count);
+        } else {
+            log.error("队列消息IP解析失败, 重试次数达到上限, ip : {}, count : {}", ip, count);
+        }
+    }
+
+    private void handleSuccessIpAssay(String type, Long id, String ipData) {
+        //注册用户, t_user表
+        if (Objects.equals(type, "REG")) {
+            userService.update(new LambdaUpdateWrapper<User>()
+                    .set(User::getIpData, ipData)
+                    .eq(User::getId, id)
+            );
+        }
+        //登录日志, t_user_login_log表
+        if (Objects.equals(type, "LOGIN")) {
+            userLoginLogService.update(new LambdaUpdateWrapper<UserLoginLog>()
+                    .set(UserLoginLog::getIpData, ipData)
+                    .eq(UserLoginLog::getId, id)
+            );
+        }
+    }
+
     private String ipAssay(String ip) {
         if (Strings.isBlank(ip)) {
             return null;

+ 1 - 1
game-module/game-module-sdk/src/main/java/com/zanxiang/game/module/sdk/SDKApplication.java

@@ -25,7 +25,7 @@ public class SDKApplication {
 
     public static void main(String[] args) {
         SpringApplication.run(SDKApplication.class, args);
-        System.out.println("赞象SDK服务启动成功 <新增IP解析任务队列01> ( ´・・)ノ(._.`) \n" +
+        System.out.println("赞象SDK服务启动成功 <新增IP解析任务队列02> ( ´・・)ノ(._.`) \n" +
                 " ___________ _   __\n" +
                 "/  ___|  _  \\ | / /\n" +
                 "\\ `--.| | | | |/ / \n" +