Forráskód Böngészése

fix : IP解析历史数据处理新增线程池

bilingfeng 3 hónapja
szülő
commit
71d9ff9398

+ 1 - 1
game-module/game-module-manage/src/main/java/com/zanxiang/game/module/manage/ManageApplication.java

@@ -23,7 +23,7 @@ public class ManageApplication {
 
     public static void main(String[] args) {
         SpringApplication.run(ManageApplication.class, args);
-        System.out.println("赞象Manage服务启动成功 < (IP解析历史数据处理调试 ・・)ノ(._.`) \n" +
+        System.out.println("赞象Manage服务启动成功 < (IP解析历史数据处理新增线程池 ・・)ノ(._.`) \n" +
                 "___  ___  ___   _   _   ___  _____  _____ \n" +
                 "|  \\/  | / _ \\ | \\ | | / _ \\|  __ \\|  ___|\n" +
                 "| .  . |/ /_\\ \\|  \\| |/ /_\\ \\ |  \\/| |__  \n" +

+ 49 - 26
game-module/game-module-manage/src/main/java/com/zanxiang/game/module/manage/service/impl/IpDataAssayServiceImpl.java

@@ -4,6 +4,7 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.github.sd4324530.jtuple.Tuple3;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.zanxiang.erp.base.ErpServer;
 import com.zanxiang.erp.base.pojo.vo.SysIpv4RpcVO;
 import com.zanxiang.erp.base.rpc.ISysIpv4Rpc;
@@ -25,6 +26,9 @@ import org.springframework.stereotype.Service;
 import java.time.LocalDateTime;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 /**
@@ -48,6 +52,15 @@ public class IpDataAssayServiceImpl implements IIpDataAssayService {
     @Autowired
     private IUserLoginLogService userLoginLogService;
 
+    private static final ThreadPoolExecutor THREAD_POOL_IP_DATA_ASSAY = new ThreadPoolExecutor(
+            10,
+            10,
+            0,
+            TimeUnit.MINUTES,
+            new LinkedBlockingQueue<>(),
+            new ThreadFactoryBuilder()
+                    .setNameFormat("sync-ipDataAssay-%d").build());
+
     @Override
     public void userHandle(long supperGameId) {
         long page = 1L;
@@ -68,23 +81,27 @@ public class IpDataAssayServiceImpl implements IIpDataAssayService {
                     .orderByDesc(User::getCreateTime));
             totalPage = pageUser.getPages();
             List<User> userList = pageUser.getRecords();
-            log.error("当前执行页, page : {}, total : {}, size : {}", page, totalPage, userList.size());
-            userList.forEach(user -> {
-                if (Strings.isNotBlank(user.getIpData())) {
-                    return;
-                }
-                String ipData = this.ipAssay(user.getIp());
-                if (Strings.isBlank(ipData)) {
-                    log.error("IP解析结果返回为空, ip : {}", user.getIp());
-                    return;
-                }
-                userService.update(new LambdaUpdateWrapper<User>()
-                        .set(User::getIpData, ipData)
-                        .eq(User::getId, user.getId()));
-            });
+            log.error("用户执行页, page : {}, totalPage : {}, size : {}", page, totalPage, userList.size());
+            this.pageUserHandle(userList);
         } while (++page <= totalPage);
     }
 
+    private void pageUserHandle(List<User> userList) {
+        THREAD_POOL_IP_DATA_ASSAY.execute(() -> userList.forEach(user -> {
+            if (Strings.isNotBlank(user.getIpData())) {
+                return;
+            }
+            String ipData = this.ipAssay(user.getIp());
+            if (Strings.isBlank(ipData)) {
+                log.error("用户IP解析结果返回为空, ip : {}", user.getIp());
+                return;
+            }
+            userService.update(new LambdaUpdateWrapper<User>()
+                    .set(User::getIpData, ipData)
+                    .eq(User::getId, user.getId()));
+        }));
+    }
+
     @Override
     public void userLoginLogHandle() {
         long page = 1L;
@@ -98,21 +115,27 @@ public class IpDataAssayServiceImpl implements IIpDataAssayService {
                             .orderByDesc(UserLoginLog::getCreateTime));
             totalPage = pageUserLoginLog.getPages();
             List<UserLoginLog> userLoginLogList = pageUserLoginLog.getRecords();
-            userLoginLogList.forEach(userLoginLog -> {
-                if (Strings.isNotBlank(userLoginLog.getIpData())) {
-                    return;
-                }
-                String ipData = this.ipAssay(userLoginLog.getIp());
-                if (Strings.isBlank(ipData)) {
-                    return;
-                }
-                userLoginLogService.update(new LambdaUpdateWrapper<UserLoginLog>()
-                        .set(UserLoginLog::getIpData, ipData)
-                        .eq(UserLoginLog::getId, userLoginLog.getId()));
-            });
+            log.error("登录日志执行页, page : {}, totalPage : {}, size : {}", page, totalPage, userLoginLogList.size());
+            this.pageUserLoginLogHandle(userLoginLogList);
         } while (++page <= totalPage);
     }
 
+    private void pageUserLoginLogHandle(List<UserLoginLog> userLoginLogList) {
+        THREAD_POOL_IP_DATA_ASSAY.execute(() -> userLoginLogList.forEach(userLoginLog -> {
+            if (Strings.isNotBlank(userLoginLog.getIpData())) {
+                return;
+            }
+            String ipData = this.ipAssay(userLoginLog.getIp());
+            if (Strings.isBlank(ipData)) {
+                log.error("登录日志IP解析结果返回为空, ip : {}", userLoginLog.getIp());
+                return;
+            }
+            userLoginLogService.update(new LambdaUpdateWrapper<UserLoginLog>()
+                    .set(UserLoginLog::getIpData, ipData)
+                    .eq(UserLoginLog::getId, userLoginLog.getId()));
+        }));
+    }
+
     @Override
     public void ipDataHandle(List<String> msgList) {
         msgList.forEach(msg -> {