Переглянути джерело

fix : Websocket 增加心跳

bilingfeng 1 рік тому
батько
коміт
4768359816

+ 1 - 1
game-module/game-module-manage/src/main/java/com/zanxiang/game/module/manage/ManageApplication.java

@@ -23,7 +23,7 @@ public class ManageApplication {
 
     public static void main(String[] args) {
         SpringApplication.run(ManageApplication.class, args);
-        System.out.println("赞象Manage服务启动成功 <Websocket增加心跳机制> ( ´・・)ノ(._.`) \n" +
+        System.out.println("赞象Manage服务启动成功 <Websocket调试修改> ( ´・・)ノ(._.`) \n" +
                 "___  ___  ___   _   _   ___  _____  _____ \n" +
                 "|  \\/  | / _ \\ | \\ | | / _ \\|  __ \\|  ___|\n" +
                 "| .  . |/ /_\\ \\|  \\| |/ /_\\ \\ |  \\/| |__  \n" +

+ 2 - 2
game-module/game-module-manage/src/main/java/com/zanxiang/game/module/manage/pojo/dto/KfWebSocketMsgDTO.java

@@ -86,8 +86,8 @@ public class KfWebSocketMsgDTO {
         return new KfWebSocketMsgDTO(webSocketMsgType, 0, "success", kfUserId);
     }
 
-    public static KfWebSocketMsgDTO fail(Long kfUserId, KfWebSocketMsgEnum webSocketMsgType, String errorMsg) {
-        return new KfWebSocketMsgDTO(webSocketMsgType, 400, errorMsg, kfUserId);
+    public static KfWebSocketMsgDTO fail(KfWebSocketMsgEnum webSocketMsgType, String errorMsg) {
+        return new KfWebSocketMsgDTO(webSocketMsgType, 400, errorMsg, null);
     }
 
     @Data

+ 5 - 0
game-module/game-module-manage/src/main/java/com/zanxiang/game/module/manage/pojo/params/KfWebSocketMsgParam.java

@@ -16,6 +16,11 @@ public class KfWebSocketMsgParam {
      */
     private KfWebSocketMsgEnum webSocketMsgType;
 
+    /**
+     * 客服id
+     */
+    private Long kfUserId;
+
     /**
      * 客服登录token, 必传参数
      */

+ 12 - 5
game-module/game-module-manage/src/main/java/com/zanxiang/game/module/manage/service/impl/KfAppletMsgServiceImpl.java

@@ -2,18 +2,19 @@ package com.zanxiang.game.module.manage.service.impl;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
+import com.zanxiang.game.module.manage.constant.RedisKeyConstant;
 import com.zanxiang.game.module.manage.enums.KfRoomMsgOwnerEnum;
 import com.zanxiang.game.module.manage.enums.KfRoomMsgTypeEnum;
 import com.zanxiang.game.module.manage.enums.KfWebSocketMsgEnum;
 import com.zanxiang.game.module.manage.pojo.dto.KfAppletMsgDTO;
 import com.zanxiang.game.module.manage.pojo.dto.KfWebSocketMsgDTO;
 import com.zanxiang.game.module.manage.service.*;
-import com.zanxiang.game.module.manage.websocket.KfMsgWebsocketHandler;
 import com.zanxiang.game.module.mybatis.entity.*;
 import com.zanxiang.module.util.JsonUtil;
 import com.zanxiang.module.util.bean.BeanUtil;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.stereotype.Service;
 
 import java.time.LocalDateTime;
@@ -30,10 +31,10 @@ import java.util.Objects;
 public class KfAppletMsgServiceImpl implements IKfAppletMsgService {
 
     @Autowired
-    private IKfSessionUserService kfSessionUserService;
+    private RedisTemplate<String, String> redisTemplate;
 
     @Autowired
-    private KfMsgWebsocketHandler kfMsgWebsocketHandler;
+    private IKfSessionUserService kfSessionUserService;
 
     @Autowired
     private IKfRoomService kfRoomService;
@@ -93,7 +94,7 @@ public class KfAppletMsgServiceImpl implements IKfAppletMsgService {
                     .eq(KfSessionUser::getOpenId, kfAppletMsgDTO.getFromUserName()));
         }
         //消息转发到redis频道
-        kfMsgWebsocketHandler.pushMessage(this.transform(kfRoom, gameApplet.getGameId(), kfRoomMsg, msgContent));
+        this.pushMessage(this.transform(kfRoom, gameApplet.getGameId(), kfRoomMsg, msgContent));
     }
 
     private KfWebSocketMsgDTO transform(KfRoom kfRoom, Long gameId, KfRoomMsg kfRoomMsg, KfWebSocketMsgDTO.MsgContentBean msgContent) {
@@ -103,7 +104,6 @@ public class KfAppletMsgServiceImpl implements IKfAppletMsgService {
         //
 
 
-
         return KfWebSocketMsgDTO.builder()
                 .webSocketMsgType(kfRoom == null ? KfWebSocketMsgEnum.WEBSOCKET_MSG_WAIT_LIST : KfWebSocketMsgEnum.WEBSOCKET_MSG_ROOM_MSG)
                 .kfUserId(kfRoom == null ? null : kfRoom.getKfUserId())
@@ -160,4 +160,11 @@ public class KfAppletMsgServiceImpl implements IKfAppletMsgService {
         }
         return msgContentBean;
     }
+
+    /**
+     * 消息发送到redis广播
+     */
+    private void pushMessage(KfWebSocketMsgDTO kfWebSocketMsgDTO) {
+        redisTemplate.convertAndSend(RedisKeyConstant.KF_MSG_REDIS_LISTEN_TOPIC, JsonUtil.toString(kfWebSocketMsgDTO));
+    }
 }

+ 1 - 1
game-module/game-module-manage/src/main/java/com/zanxiang/game/module/manage/websocket/KfMsgRedisListener.java

@@ -47,7 +47,7 @@ public class KfMsgRedisListener implements MessageListener {
         }
         //发送给所有在线客服
         log.error("发送消息给所有客服客服 kfWebSocketMsgDTO : {}", JsonUtil.toString(kfWebSocketMsgDTO));
-        List<WebSocketSession> openSessions = kfMsgWebSocketSessionRegistry.getAllAessions();
+        List<WebSocketSession> openSessions = kfMsgWebSocketSessionRegistry.getAllSessions();
         openSessions.forEach(session -> {
             if (session != null && session.isOpen()) {
                 try {

+ 1 - 1
game-module/game-module-manage/src/main/java/com/zanxiang/game/module/manage/websocket/KfMsgWebSocketSessionRegistry.java

@@ -29,7 +29,7 @@ public class KfMsgWebSocketSessionRegistry {
         sessions.remove(userId);
     }
 
-    public List<WebSocketSession> getAllAessions() {
+    public List<WebSocketSession> getAllSessions() {
         return new ArrayList<>(sessions.values());
     }
 }

+ 101 - 91
game-module/game-module-manage/src/main/java/com/zanxiang/game/module/manage/websocket/KfMsgWebsocketHandler.java

@@ -3,7 +3,6 @@ package com.zanxiang.game.module.manage.websocket;
 import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
 import com.zanxiang.erp.base.pojo.TokenInfo;
 import com.zanxiang.erp.security.util.SecurityUtil;
-import com.zanxiang.game.module.manage.constant.RedisKeyConstant;
 import com.zanxiang.game.module.manage.enums.KfRoomMsgOwnerEnum;
 import com.zanxiang.game.module.manage.enums.KfWebSocketMsgEnum;
 import com.zanxiang.game.module.manage.pojo.dto.KfWebSocketMsgDTO;
@@ -19,14 +18,11 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.logging.log4j.util.Strings;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Lazy;
-import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.stereotype.Component;
-import org.springframework.web.socket.CloseStatus;
-import org.springframework.web.socket.WebSocketHandler;
-import org.springframework.web.socket.WebSocketMessage;
-import org.springframework.web.socket.WebSocketSession;
+import org.springframework.web.socket.*;
 import reactor.util.function.Tuple2;
 
+import java.io.IOException;
 import java.time.LocalDateTime;
 import java.util.List;
 import java.util.Objects;
@@ -41,9 +37,6 @@ import java.util.Objects;
 @Component
 public class KfMsgWebsocketHandler implements WebSocketHandler {
 
-    @Autowired
-    private RedisTemplate<String, String> redisTemplate;
-
     @Autowired
     private KfMsgWebSocketSessionRegistry kfMsgWebSocketSessionRegistry;
 
@@ -70,66 +63,60 @@ public class KfMsgWebsocketHandler implements WebSocketHandler {
     public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
         String msgStr = message.getPayload().toString();
         log.error("收到前端消息 msgStr : {}", msgStr);
+
         //消息解析
         KfWebSocketMsgParam param = JsonUtil.toObj(msgStr, KfWebSocketMsgParam.class);
         log.error("收到前端消息解析结果 kfWebSocketMsgParam : {}", JsonUtil.toString(param));
+
         //消息类型
         KfWebSocketMsgEnum webSocketMsgType = param.getWebSocketMsgType();
         //游戏id
         Long gameId = param.getGameId();
-        //请求令牌
-        String token = param.getToken();
-        //参数验证
-        boolean checkResult = this.paramCheck(webSocketMsgType, token, gameId);
-        //参数验证失败, 没有后续动作
-        if (!checkResult) {
-            log.error("token验证失败, token : {}", token);
-            return;
-        }
-        //心跳
-        if (Objects.equals(webSocketMsgType, KfWebSocketMsgEnum.WEBSOCKET_MSG_CONNECT_HEART_BEAT)) {
-            log.error("收到客户端心跳消息, param : {}", JsonUtil.toString(param));
+        //参数验证不通过, 直接结束
+        if (!this.paramCheck(session, webSocketMsgType, param.getToken(), gameId)) {
             return;
         }
-        //握手-消息处理
-        if (Objects.equals(webSocketMsgType, KfWebSocketMsgEnum.WEBSOCKET_MSG_KF_HAND_SHAKE)) {
-            log.error("首次握手, kfUserId : {}", SecurityUtil.getUserId());
-            this.kfHandShake(session, webSocketMsgType);
-        }
-        //创建连接-消息处理
-        if (Objects.equals(webSocketMsgType, KfWebSocketMsgEnum.WEBSOCKET_MSG_KF_CREATE_CONNECT)) {
-            log.error("创建连接, kfUserId : {}, gameId : {}", SecurityUtil.getUserId(), gameId);
-            this.kfCreateConnect(webSocketMsgType, gameId);
-        }
-        //玩家接入-消息处理
-        if (Objects.equals(webSocketMsgType, KfWebSocketMsgEnum.WEBSOCKET_MSG_USER_CONNECT_JOIN)) {
-            log.error("玩家接入, kfUserId : {}, gameId : {}", SecurityUtil.getUserId(), gameId);
-            this.userConnectJoin(param);
-        }
-        //获取房间历史消息-消息处理
-        if (Objects.equals(webSocketMsgType, KfWebSocketMsgEnum.WEBSOCKET_MSG_ROOM_HISTORY)) {
-            log.error("获取房间历史消息, kfUserId : {}, gameId : {}, roomId : {}",
-                    SecurityUtil.getUserId(), param.getGameId(), param.getRoomId());
-            this.msgRoomHistory(param);
-        }
-        //客服发送消息-消息处理
-        if (Objects.equals(webSocketMsgType, KfWebSocketMsgEnum.WEBSOCKET_MSG_KF_SEND)) {
-
-        }
-        //获取已结束房间列表-消息处理
-        if (Objects.equals(webSocketMsgType, KfWebSocketMsgEnum.WEBSOCKET_MSG_FINISH_ROOM_LIST)) {
-            log.error("获取已结束房间列表, kfUserId : {}, gameId : {}", SecurityUtil.getUserId(), param.getGameId());
-            this.finishRoomList(param);
-        }
-        //结束会话-消息处理
-        if (Objects.equals(webSocketMsgType, KfWebSocketMsgEnum.WEBSOCKET_MSG_FINISH_SESSION)) {
-            log.error("结束会话, kfUserId : {}, gameId : {}, roomId : {}",
-                    SecurityUtil.getUserId(), param.getGameId(), param.getRoomId());
-            this.kfFinishSession(webSocketMsgType, param.getGameId(), param.getRoomId());
-        }
-        //快捷回复-消息处理
-        if (Objects.equals(webSocketMsgType, KfWebSocketMsgEnum.WEBSOCKET_MSG_QUICK_REPLY)) {
 
+        //处理不同类型的消息
+        switch (webSocketMsgType) {
+            case WEBSOCKET_MSG_CONNECT_HEART_BEAT:
+                log.error("收到客户端心跳消息, param : {}", JsonUtil.toString(param));
+                break;
+            case WEBSOCKET_MSG_KF_HAND_SHAKE:
+                log.error("首次握手, kfUserId : {}", SecurityUtil.getUserId());
+                kfHandShake(session, webSocketMsgType);
+                break;
+            case WEBSOCKET_MSG_KF_CREATE_CONNECT:
+                log.error("创建连接, kfUserId : {}, gameId : {}", SecurityUtil.getUserId(), gameId);
+                kfCreateConnect(session, webSocketMsgType, gameId);
+                break;
+            case WEBSOCKET_MSG_USER_CONNECT_JOIN:
+                log.error("玩家接入, kfUserId : {}, gameId : {}", SecurityUtil.getUserId(), gameId);
+                userConnectJoin(session, param);
+                break;
+            case WEBSOCKET_MSG_ROOM_HISTORY:
+                log.error("获取房间历史消息, kfUserId : {}, gameId : {}, roomId : {}",
+                        SecurityUtil.getUserId(), gameId, param.getRoomId());
+                msgRoomHistory(session, param);
+                break;
+            case WEBSOCKET_MSG_KF_SEND:
+                // 客服发送消息,可以添加对应的处理逻辑
+                break;
+            case WEBSOCKET_MSG_FINISH_ROOM_LIST:
+                log.error("获取已结束房间列表, kfUserId : {}, gameId : {}", SecurityUtil.getUserId(), gameId);
+                finishRoomList(session, param);
+                break;
+            case WEBSOCKET_MSG_FINISH_SESSION:
+                log.error("结束会话, kfUserId : {}, gameId : {}, roomId : {}",
+                        SecurityUtil.getUserId(), gameId, param.getRoomId());
+                kfFinishSession(session, webSocketMsgType, gameId, param.getRoomId());
+                break;
+            case WEBSOCKET_MSG_QUICK_REPLY:
+                // 快捷回复,可以添加对应的处理逻辑
+                break;
+            default:
+                // 未知消息类型,可以添加对应的处理逻辑
+                this.sendMessage(session, KfWebSocketMsgDTO.fail(webSocketMsgType, "参数错误, 未知的消息类型"));
         }
     }
 
@@ -158,7 +145,7 @@ public class KfMsgWebsocketHandler implements WebSocketHandler {
         return false;
     }
 
-    private void kfFinishSession(KfWebSocketMsgEnum webSocketMsgType, Long gameId, Long roomId) {
+    private void kfFinishSession(WebSocketSession session, KfWebSocketMsgEnum webSocketMsgType, Long gameId, Long roomId) {
         //房间在线状态更新
         kfRoomService.update(new LambdaUpdateWrapper<KfRoom>()
                 .set(KfRoom::getOnline, Boolean.FALSE)
@@ -166,7 +153,7 @@ public class KfMsgWebsocketHandler implements WebSocketHandler {
                 .eq(KfRoom::getId, roomId));
         //推送完整的已链接房间列表
         List<KfWebSocketMsgDTO.RoomBean> onlineRoomList = kfRoomService.getOnlineRoomList(gameId);
-        this.pushMessage(KfWebSocketMsgDTO.builder()
+        this.sendMessage(session, KfWebSocketMsgDTO.builder()
                 .webSocketMsgType(webSocketMsgType)
                 .kfUserId(SecurityUtil.getUserId())
                 .gameId(gameId)
@@ -174,10 +161,10 @@ public class KfMsgWebsocketHandler implements WebSocketHandler {
                 .build());
     }
 
-    private void finishRoomList(KfWebSocketMsgParam param) {
+    private void finishRoomList(WebSocketSession session, KfWebSocketMsgParam param) {
         Tuple2<KfWebSocketMsgDTO.PageBean, List<KfWebSocketMsgDTO.RoomBean>> tuple2 = kfRoomService
                 .getFinishRoomList(param.getGameId(), param.getPage());
-        this.pushMessage(KfWebSocketMsgDTO.builder()
+        this.sendMessage(session, KfWebSocketMsgDTO.builder()
                 .webSocketMsgType(param.getWebSocketMsgType())
                 .kfUserId(SecurityUtil.getUserId())
                 .gameId(param.getGameId())
@@ -186,9 +173,9 @@ public class KfMsgWebsocketHandler implements WebSocketHandler {
                 .build());
     }
 
-    private void msgRoomHistory(KfWebSocketMsgParam param) {
+    private void msgRoomHistory(WebSocketSession session, KfWebSocketMsgParam param) {
         if (param.getRoomId() == null) {
-            this.pushMessage(KfWebSocketMsgDTO.fail(SecurityUtil.getUserId(), param.getWebSocketMsgType(),
+            this.sendMessage(session, KfWebSocketMsgDTO.fail(param.getWebSocketMsgType(),
                     "获取房间历史消息参数错误, roomId不可为空, param : " + JsonUtil.toString(param)));
         }
         //当获取第一页的时候, 房间未读消息全部更新成已读消息
@@ -201,7 +188,7 @@ public class KfMsgWebsocketHandler implements WebSocketHandler {
         //分页获取房间消息列表
         Tuple2<KfWebSocketMsgDTO.PageBean, List<KfWebSocketMsgDTO.RoomMsgBean>> tuple2 = kfRoomMsgService
                 .msgRoomHistory(param.getRoomId(), param.getPage());
-        this.pushMessage(KfWebSocketMsgDTO.builder()
+        this.sendMessage(session, KfWebSocketMsgDTO.builder()
                 .webSocketMsgType(param.getWebSocketMsgType())
                 .kfUserId(SecurityUtil.getUserId())
                 .page(tuple2.getT1())
@@ -211,14 +198,13 @@ public class KfMsgWebsocketHandler implements WebSocketHandler {
                 .build());
     }
 
-    private void userConnectJoin(KfWebSocketMsgParam param) {
+    private void userConnectJoin(WebSocketSession session, KfWebSocketMsgParam param) {
         if (Strings.isBlank(param.getOpenId())) {
-            this.pushMessage(KfWebSocketMsgDTO.fail(SecurityUtil.getUserId(), param.getWebSocketMsgType(),
+            this.sendMessage(session, KfWebSocketMsgDTO.fail(param.getWebSocketMsgType(),
                     "接入玩家参数错误, openId和gameId都不可为空, param : " + JsonUtil.toString(param)));
         }
         //todo : 判断玩家是否已被接入
 
-
         //玩家更新
         kfSessionUserService.update(new LambdaUpdateWrapper<KfSessionUser>()
                 .set(KfSessionUser::getIsWait, Boolean.FALSE)
@@ -235,7 +221,7 @@ public class KfMsgWebsocketHandler implements WebSocketHandler {
                 .eq(KfRoomMsg::getReadStatus, Boolean.FALSE));
         //发送消息, 给该客服返回完整的已接入房间列表
         List<KfWebSocketMsgDTO.RoomBean> roomList = kfRoomService.getOnlineRoomList(param.getGameId());
-        this.pushMessage(KfWebSocketMsgDTO.builder()
+        this.sendMessage(session, KfWebSocketMsgDTO.builder()
                 .webSocketMsgType(param.getWebSocketMsgType())
                 .kfUserId(SecurityUtil.getUserId())
                 .gameId(param.getGameId())
@@ -243,20 +229,20 @@ public class KfMsgWebsocketHandler implements WebSocketHandler {
                 .build());
         //发送消息, 给所有在线客服推送完整待接入列表
         List<KfWebSocketMsgDTO.WaitUserBean> waitUserList = kfSessionUserService.getWaitUserList(param.getGameId());
-        this.pushMessage(KfWebSocketMsgDTO.builder()
+        this.sendMessage(session, KfWebSocketMsgDTO.builder()
                 .webSocketMsgType(param.getWebSocketMsgType())
                 .gameId(param.getGameId())
                 .waitUserList(waitUserList)
                 .build());
     }
 
-    private void kfCreateConnect(KfWebSocketMsgEnum msgTypeEnum, Long gameId) {
+    private void kfCreateConnect(WebSocketSession session, KfWebSocketMsgEnum msgTypeEnum, Long gameId) {
         //获取待接入列表
         List<KfWebSocketMsgDTO.WaitUserBean> waitUserList = kfSessionUserService.getWaitUserList(gameId);
         //获取已接入房间列表
         List<KfWebSocketMsgDTO.RoomBean> roomList = kfRoomService.getOnlineRoomList(gameId);
         //发送消息
-        this.pushMessage(KfWebSocketMsgDTO.builder()
+        this.sendMessage(session, KfWebSocketMsgDTO.builder()
                 .webSocketMsgType(msgTypeEnum)
                 .kfUserId(SecurityUtil.getUserId())
                 .gameId(gameId)
@@ -266,48 +252,72 @@ public class KfMsgWebsocketHandler implements WebSocketHandler {
     }
 
     private void kfHandShake(WebSocketSession session, KfWebSocketMsgEnum msgTypeEnum) {
-        //添加会话
-        Long kfUserId = SecurityUtil.getUserId();
-        session.getAttributes().put("kfUserId", kfUserId);
-        kfMsgWebSocketSessionRegistry.addSession(kfUserId, session);
         //获取游戏列表
         List<KfWebSocketMsgDTO.GameBean> gameList = kfRoomService.getKfGameList();
         //发送消息
-        this.pushMessage(KfWebSocketMsgDTO.builder()
+        this.sendMessage(session, KfWebSocketMsgDTO.builder()
                 .webSocketMsgType(msgTypeEnum)
                 .kfUserId(SecurityUtil.getUserId())
                 .gameList(gameList)
                 .build());
     }
 
-    private boolean paramCheck(KfWebSocketMsgEnum webSocketMsgType, String token, Long gameId) {
+    private boolean paramCheck(WebSocketSession session, KfWebSocketMsgEnum webSocketMsgType, String token, Long gameId) {
+        //gameId参数为空
+        if (gameId == null) {
+            //心跳和握手消息, 不需要携带gameId
+            if (!Objects.equals(KfWebSocketMsgEnum.WEBSOCKET_MSG_CONNECT_HEART_BEAT, webSocketMsgType)
+                    || !Objects.equals(KfWebSocketMsgEnum.WEBSOCKET_MSG_KF_HAND_SHAKE, webSocketMsgType)) {
+                this.sendMessage(session, KfWebSocketMsgDTO.fail(webSocketMsgType, "参数错误, gameId为空"));
+                return Boolean.FALSE;
+            }
+        }
         //令牌为空
         if (Strings.isBlank(token)) {
-            this.pushMessage(KfWebSocketMsgDTO.fail(SecurityUtil.getUserId(), webSocketMsgType, "非法参数, token令牌不可为空"));
+            log.error("非法参数, token令牌和客服id不可为空");
+            this.sendMessage(session, KfWebSocketMsgDTO.fail(webSocketMsgType, "非法参数, token令牌和kfUserId不可为空"));
             return Boolean.FALSE;
         }
         //令牌验证
-        TokenInfo tokenInfo = SecurityUtil.parseToken(token);
-        if (tokenInfo == null) {
-            this.pushMessage(KfWebSocketMsgDTO.fail(SecurityUtil.getUserId(), webSocketMsgType, "参数错误, 令牌验证不通过"));
+        TokenInfo tokenInfo;
+        try {
+            tokenInfo = SecurityUtil.parseToken(token);
+        } catch (Exception e) {
+            log.error("token验证异常, token : {}, e : {}", token, e.getMessage());
+            this.sendMessage(session, KfWebSocketMsgDTO.fail(webSocketMsgType, "token验证异常, " + e.getMessage()));
             return Boolean.FALSE;
         }
-        //非首次握手, 校验gameId参数
-        if (!Objects.equals(KfWebSocketMsgEnum.WEBSOCKET_MSG_KF_HAND_SHAKE, webSocketMsgType)
-                && gameId == null) {
-            this.pushMessage(KfWebSocketMsgDTO.fail(SecurityUtil.getUserId(), webSocketMsgType, "参数错误, gameId为空"));
+        if (tokenInfo == null) {
+            log.error("参数错误, 令牌验证不通过, token : {}", token);
+            this.sendMessage(session, KfWebSocketMsgDTO.fail(webSocketMsgType, "参数错误, 令牌验证不通过"));
             return Boolean.FALSE;
         }
         //将token设置到当前线程
-        SecurityUtil.fillToken(token);
+        try {
+            SecurityUtil.fillToken(token);
+        } catch (Exception e) {
+            log.error("token刷新到本地线程失败, token : {}, e : {}", token, e.getMessage());
+            this.sendMessage(session, KfWebSocketMsgDTO.fail(webSocketMsgType, "token刷新到本地线程失败"));
+            return Boolean.FALSE;
+        }
+        //添加会话
+        if (kfMsgWebSocketSessionRegistry.getSession(SecurityUtil.getUserId()) == null) {
+            session.getAttributes().put("kfUserId", SecurityUtil.getUserId());
+            kfMsgWebSocketSessionRegistry.addSession(SecurityUtil.getUserId(), session);
+        }
         //返回验证通过
         return Boolean.TRUE;
     }
 
     /**
-     * 消息发送到redis广播
+     * session直接发送消息
+     *
+     * @param session : 会话对象
      */
-    public void pushMessage(KfWebSocketMsgDTO kfWebSocketMsgDTO) {
-        redisTemplate.convertAndSend(RedisKeyConstant.KF_MSG_REDIS_LISTEN_TOPIC, JsonUtil.toString(kfWebSocketMsgDTO));
+    private void sendMessage(WebSocketSession session, KfWebSocketMsgDTO kfWebSocketMsgDTO) {
+        try {
+            session.sendMessage(new TextMessage(JsonUtil.toString(kfWebSocketMsgDTO)));
+        } catch (IOException ignored) {
+        }
     }
 }