From d4ef4a8eebc8b115bacd9b3170618fd024938042 Mon Sep 17 00:00:00 2001
From: xiangpei <xiangpei@timesnew.cn>
Date: 星期二, 23 四月 2024 17:29:02 +0800
Subject: [PATCH] websocket优化

---
 ycl-server/src/main/java/com/ycl/websocket/handler/HeartBeatHandler.java        |   40 ++++
 ycl-server/src/main/java/com/ycl/system/service/TokenService.java               |   38 +++
 /dev/null                                                                       |   67 ------
 ycl-server/src/main/java/com/ycl/websocket/WebsocketServer.java                 |   30 ++
 ycl-server/src/main/java/com/ycl/websocket/NettyConnect.java                    |   78 +++++++
 ycl-common/src/main/java/enumeration/MsgTypeEnum.java                           |   19 +
 ycl-server/src/main/java/com/ycl/websocket/msg/Message.java                     |   22 ++
 ycl-server/src/main/java/com/ycl/websocket/handler/WebSocketHandler.java        |  120 ++++++++++++
 ycl-server/src/main/java/com/ycl/websocket/service/DefaultSendWebsocketMsg.java |   63 ++++++
 ycl-server/src/main/java/com/ycl/PlatformApplication.java                       |    8 
 ycl-server/src/main/java/com/ycl/websocket/WebSocketListener.java               |   31 +++
 ycl-server/src/main/java/com/ycl/websocket/service/SendWebsocketMsg.java        |   35 +++
 12 files changed, 471 insertions(+), 80 deletions(-)

diff --git a/ycl-common/src/main/java/enumeration/MsgTypeEnum.java b/ycl-common/src/main/java/enumeration/MsgTypeEnum.java
new file mode 100644
index 0000000..2b3741c
--- /dev/null
+++ b/ycl-common/src/main/java/enumeration/MsgTypeEnum.java
@@ -0,0 +1,19 @@
+package enumeration;
+
+/**
+ * @author锛歺p
+ * @date锛�2024/4/21 10:25
+ */
+public enum MsgTypeEnum {
+
+    ;
+
+    private final String value;
+
+    private final String desc;
+
+    MsgTypeEnum(String value, String desc) {
+        this.value = value;
+        this.desc = desc;
+    }
+}
diff --git a/ycl-server/src/main/java/com/ycl/PlatformApplication.java b/ycl-server/src/main/java/com/ycl/PlatformApplication.java
index 4f02f8a..ba6a7ac 100644
--- a/ycl-server/src/main/java/com/ycl/PlatformApplication.java
+++ b/ycl-server/src/main/java/com/ycl/PlatformApplication.java
@@ -1,8 +1,6 @@
 package com.ycl;
 
-import com.ycl.websocket.WebsocketServer;
 import lombok.extern.slf4j.Slf4j;
-import org.mybatis.spring.annotation.MapperScan;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.cache.annotation.EnableCaching;
@@ -16,11 +14,5 @@
     public static void main(String[] args) {
         SpringApplication.run(PlatformApplication.class, args);
         log.info("(鈾モ棤鈥库棤)锞夛緸  鑷础杩愮淮骞冲彴鍚姩鎴愬姛   醿�(麓凇`醿�)锞�");
-        try {
-            WebsocketServer.runWebsocket();
-
-        } catch (Exception e) {
-            log.info("websocket鍚姩澶辫触");
-        }
     }
 }
diff --git a/ycl-server/src/main/java/com/ycl/system/service/TokenService.java b/ycl-server/src/main/java/com/ycl/system/service/TokenService.java
index 37c87dc..06c060f 100644
--- a/ycl-server/src/main/java/com/ycl/system/service/TokenService.java
+++ b/ycl-server/src/main/java/com/ycl/system/service/TokenService.java
@@ -83,6 +83,33 @@
         return null;
     }
 
+
+    /**
+     * 鑾峰彇鐢ㄦ埛韬唤淇℃伅
+     *
+     * @return 鐢ㄦ埛淇℃伅
+     */
+    public LoginUser getLoginUser(String token)
+    {
+        if (StringUtils.isNotEmpty(token))
+        {
+            try
+            {
+                Claims claims = parseToken(token);
+                // 瑙f瀽瀵瑰簲鐨勬潈闄愪互鍙婄敤鎴蜂俊鎭�
+                String uuid = (String) claims.get(Constants.LOGIN_USER_KEY);
+                String userKey = getTokenKey(uuid);
+                LoginUser user = redisCache.getCacheObject(userKey);
+                return user;
+            }
+            catch (Exception e)
+            {
+                log.error("鑾峰彇鐢ㄦ埛淇℃伅寮傚父'{}'", e.getMessage());
+            }
+        }
+        return null;
+    }
+
     /**
      * 璁剧疆鐢ㄦ埛韬唤淇℃伅
      */
@@ -210,6 +237,17 @@
     }
 
     /**
+     * 浠庝护鐗屼腑鑾峰彇鐢ㄦ埛鍚�
+     *
+     * @param token 浠ょ墝
+     * @return 鐢ㄦ埛鍚�
+     */
+    public LoginUser getUserInfoFromToken(String token)
+    {
+        return getLoginUser(token);
+    }
+
+    /**
      * 鑾峰彇璇锋眰token
      *
      * @param request
diff --git a/ycl-server/src/main/java/com/ycl/websocket/NettyConfig.java b/ycl-server/src/main/java/com/ycl/websocket/NettyConfig.java
deleted file mode 100644
index f2ef8e1..0000000
--- a/ycl-server/src/main/java/com/ycl/websocket/NettyConfig.java
+++ /dev/null
@@ -1,68 +0,0 @@
-package com.ycl.websocket;
-
-import io.netty.channel.Channel;
-import io.netty.channel.group.ChannelGroup;
-import io.netty.channel.group.DefaultChannelGroup;
-import io.netty.util.concurrent.GlobalEventExecutor;
-
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * @author锛歺p
- * @date锛�2024/4/18 16:10
- */
-public class NettyConfig {
-
-    /**
-     * 瀹氫箟鍏ㄥ眬鍗曞埄channel缁� 绠$悊鎵�鏈塩hannel
-     */
-    private static volatile ChannelGroup channelGroup = null;
-
-
-    /**
-     * 瀛樻斁璇锋眰ID涓巆hannel鐨勫搴斿叧绯�
-     */
-    private static volatile ConcurrentHashMap<String, Channel> channelMap = null;
-
-
-    /**
-     * 瀹氫箟涓ゆ妸閿�
-     */
-    private static final Object lock1 = new Object();
-    private static final Object lock2 = new Object();
-
-
-
-
-    public static ChannelGroup getChannelGroup() {
-        if (null == channelGroup) {
-            synchronized (lock1) {
-                if (null == channelGroup) {
-                    channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
-                }
-            }
-        }
-        return channelGroup;
-    }
-
-
-    public static ConcurrentHashMap<String, Channel> getChannelMap() {
-        if (null == channelMap) {
-            synchronized (lock2) {
-                if (null == channelMap) {
-                    channelMap = new ConcurrentHashMap<>();
-                }
-            }
-        }
-        return channelMap;
-    }
-
-
-    public static Channel getChannel(String userId) {
-        if (null == channelMap) {
-            return getChannelMap().get(userId);
-        }
-        return channelMap.get(userId);
-    }
-
-}
diff --git a/ycl-server/src/main/java/com/ycl/websocket/NettyConnect.java b/ycl-server/src/main/java/com/ycl/websocket/NettyConnect.java
new file mode 100644
index 0000000..f998654
--- /dev/null
+++ b/ycl-server/src/main/java/com/ycl/websocket/NettyConnect.java
@@ -0,0 +1,78 @@
+package com.ycl.websocket;
+
+import io.netty.channel.Channel;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.util.concurrent.GlobalEventExecutor;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author锛歺p
+ * @date锛�2024/4/19 14:02
+ */
+public class NettyConnect {
+
+    /**
+     * netty鎻愪緵鐨勭鐞嗚繛鎺ラ泦鍚�
+     */
+    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+
+    /**
+     * 瀛樻斁鐢ㄦ埛涓嶤hanel鐨勫搴斾俊鎭紝鐢ㄤ簬缁欐寚瀹氱敤鎴峰彂閫佹秷鎭�
+     */
+    private static ConcurrentHashMap<Long, Channel> userChannelMap = new ConcurrentHashMap<>(128);
+
+    private NettyConnect() {}
+
+    /**
+     * 鑾峰彇channel缁�
+     * @return
+     */
+    public static ChannelGroup getChannelGroup() {
+        return channelGroup;
+    }
+
+    /**
+     * 绉婚櫎channel鏃讹紝鍚屾椂绉婚櫎鐢ㄦ埛瀵瑰簲鍏崇郴
+     *
+     * @param channel
+     */
+    public static void removeChannel(Channel channel) {
+        Long userId = null;
+        for (Map.Entry<Long, Channel> channelEntry : userChannelMap.entrySet()) {
+            if (Objects.equals(channel, channelEntry.getValue())) {
+                userId = channelEntry.getKey();
+                break;
+            }
+        }
+        if (Objects.nonNull(userId)) {
+            userChannelMap.remove(userId);
+        }
+        channelGroup.remove(channel);
+    }
+
+    /**
+     * 鑾峰彇鐢ㄦ埛channel map
+     * @return
+     */
+    public static ConcurrentHashMap<Long, Channel> getUserChannelMap(){
+        return userChannelMap;
+    }
+
+    /**
+     * 绉婚櫎鐢ㄦ埛鍏崇郴鏃讹紝鍚屾椂绉婚櫎杩炴帴
+     *
+     * @param userId
+     */
+    public static void removeUserChannel(Integer userId) {
+        Channel channel = userChannelMap.get(userId);
+        if (Objects.nonNull(userId)) {
+            userChannelMap.remove(userId);
+            channelGroup.remove(channel);
+        }
+    }
+
+}
diff --git a/ycl-server/src/main/java/com/ycl/websocket/WebSocketHandler.java b/ycl-server/src/main/java/com/ycl/websocket/WebSocketHandler.java
deleted file mode 100644
index 351bf42..0000000
--- a/ycl-server/src/main/java/com/ycl/websocket/WebSocketHandler.java
+++ /dev/null
@@ -1,67 +0,0 @@
-package com.ycl.websocket;
-
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.group.ChannelGroup;
-import io.netty.channel.group.DefaultChannelGroup;
-import io.netty.handler.codec.http.websocketx.*;
-import io.netty.util.concurrent.GlobalEventExecutor;
-
-import java.util.Objects;
-
-
-public class WebSocketHandler extends SimpleChannelInboundHandler {
-
-    public static ChannelGroup connects = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
-
-    @Override
-    protected void channelRead0(ChannelHandlerContext ctx, Object o) throws Exception {
-        if(Objects.nonNull(o) && o instanceof WebSocketFrame){
-            this.handleWebSocketFrame(ctx, (WebSocketFrame) o);
-        }
-    }
-
-    @Override
-    public void channelActive(ChannelHandlerContext ctx) throws Exception {
-        System.out.println("鏈夋柊鐨勫鎴风杩炴帴涓婁簡");
-        connects.add(ctx.channel());
-    }
-
-    @Override
-    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-        System.out.println("鏈夊鎴风鏂紑杩炴帴浜�");
-        connects.remove(ctx.channel());
-    }
-
-    // 澶勭悊ws鏁版嵁
-    private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
-        // 澶勭悊鍏抽棴杩炴帴
-        if (frame instanceof CloseWebSocketFrame) {
-            connects.remove(ctx.channel());
-            ctx.close();
-            return;
-        }
-        if (frame instanceof TextWebSocketFrame) {
-            // 澶勭悊鏂囨湰娑堟伅
-            String text = ((TextWebSocketFrame) frame).text();
-            System.out.println("鏈嶅姟鍣ㄦ敹鍒板鎴风鏁版嵁锛�" +text);
-            // 姝ゅ涓虹兢鍙戯紝鍗曠嫭鍙戝彲浣跨敤connects.find(ctx.channel().id()).writeAndFlush()鍙戦��
-            connects.writeAndFlush(new TextWebSocketFrame("浣犲ソ瀹㈡埛绔�"));
-            // ...
-        } else if (frame instanceof BinaryWebSocketFrame) {
-            // 澶勭悊浜岃繘鍒舵秷鎭�
-            // ...
-        } else if (frame instanceof PingWebSocketFrame) {
-            // 澶勭悊 Ping 娑堟伅
-            // 鏀跺埌 Ping 娑堟伅锛屽洖搴斾竴涓� Pong 娑堟伅锛堣〃鏄庢垜杩樻椿鐫�锛�
-            ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
-        } else if (frame instanceof PongWebSocketFrame) {
-            // 澶勭悊 Pong 娑堟伅
-            // pong娑堟伅濡傛灉娌℃湁鐗瑰畾闇�姹傦紝涓嶇敤澶勭悊
-        } else if (frame instanceof ContinuationWebSocketFrame) {
-            // 澶勭悊杩炵画甯ф秷鎭紙姣旇緝澶х殑鏁版嵁锛屽垎鐗囷級
-            // ...
-        }
-    }
-
-}
diff --git a/ycl-server/src/main/java/com/ycl/websocket/WebSocketListener.java b/ycl-server/src/main/java/com/ycl/websocket/WebSocketListener.java
new file mode 100644
index 0000000..7afce0f
--- /dev/null
+++ b/ycl-server/src/main/java/com/ycl/websocket/WebSocketListener.java
@@ -0,0 +1,31 @@
+package com.ycl.websocket;
+
+import com.ycl.system.service.TokenService;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.event.ApplicationReadyEvent;
+import org.springframework.context.event.EventListener;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author锛歺p
+ * @date锛�2024/4/14 10:56
+ */
+@Slf4j
+@Component
+@RequiredArgsConstructor
+public class WebSocketListener {
+
+    private final WebsocketServer websocketServer;
+
+    @EventListener(classes = {ApplicationReadyEvent.class})
+    public void runWebSocket() {
+        try {
+            websocketServer.runWebsocket();
+        } catch (Exception e) {
+            log.warn("websocket鍚姩澶辫触");
+        }
+    }
+
+}
diff --git a/ycl-server/src/main/java/com/ycl/websocket/WebsocketServer.java b/ycl-server/src/main/java/com/ycl/websocket/WebsocketServer.java
index a2d1603..4620d76 100644
--- a/ycl-server/src/main/java/com/ycl/websocket/WebsocketServer.java
+++ b/ycl-server/src/main/java/com/ycl/websocket/WebsocketServer.java
@@ -1,5 +1,7 @@
 package com.ycl.websocket;
 
+import com.ycl.websocket.handler.HeartBeatHandler;
+import com.ycl.websocket.handler.WebSocketHandler;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
@@ -10,7 +12,12 @@
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.codec.http.HttpServerCodec;
 import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
+import io.netty.handler.codec.serialization.ObjectEncoder;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import io.netty.handler.timeout.IdleStateHandler;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
 
 /**
  * webscoket 鏈嶅姟
@@ -19,9 +26,13 @@
  * @date锛�2024/4/11 17:47
  */
 @Slf4j
+@Component
+@RequiredArgsConstructor
 public class WebsocketServer {
 
-    public static void runWebsocket() throws Exception {
+    private final WebSocketHandler webSocketHandler;
+
+    public void runWebsocket() throws Exception {
         // 澶勭悊 I/O 鎿嶄綔鐨勫绾跨▼浜嬩欢寰幆缁勶紙绾跨▼姹狅級銆俠ossGroup鐢ㄤ簬鎺ユ敹浼犲叆鐨勮繛鎺ワ紝workerGroup鐢ㄤ簬澶勭悊IO鎿嶄綔
         EventLoopGroup bossGroup = new NioEventLoopGroup();
         EventLoopGroup workerGroup = new NioEventLoopGroup();
@@ -38,22 +49,31 @@
                             // websocket鐨勬彙鎵嬮樁娈垫槸浣跨敤鐨凥ttp锛屾墍浠ラ渶瑕佹坊澶勭悊http璇锋眰锛�
                             // 鐢ㄤ簬灏� HTTP 璇锋眰鍜屽搷搴旇浆鎹负瀛楄妭娴佷互鍙婂皢瀛楄妭娴佽浆鎹负 HTTP 璇锋眰鍜屽搷搴�
                             ch.pipeline().addLast(new HttpServerCodec());
-                            ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", null, true, 6000));
-                            ch.pipeline().addLast(new WebSocketHandler());
+                            ch.pipeline().addLast(new ObjectEncoder());
+                            // 浠ュ潡鐨勬柟寮忔潵鍐欑殑澶勭悊鍣�
+                            ch.pipeline().addLast(new ChunkedWriteHandler());
+//                            ch.pipeline().addLast(new NettyWebSocketParamHandler(secret));
+                            // 閽堝瀹㈡埛绔紝鑻�10s鍐呮棤璇讳簨浠跺垯瑙﹀彂蹇冭烦澶勭悊鏂规硶HeartBeatHandler#userEventTriggered
+                            ch.pipeline().addLast(new IdleStateHandler(60 , 60 , 60));
+                            // 鑷畾涔夌┖闂茬姸鎬佹娴�(鑷畾涔夊績璺虫娴媓andler)
+                            ch.pipeline().addLast(new HeartBeatHandler());
+                            ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10));
+                            ch.pipeline().addLast(webSocketHandler);
                         }
                     })
                     // 璁剧疆鏈嶅姟鍣ㄩ�氶亾(涓婚�氶亾)鐨勯�夐」锛屾澶勬槸璁剧疆杩炴帴璇锋眰闃熷垪鐨勬渶澶ч暱搴︽槸128
                     .option(ChannelOption.SO_BACKLOG, 128);
 
             // 缁戝畾鏈嶅姟鍣ㄥ埌鎸囧畾鐨勭鍙o紝骞朵笖绛夊緟缁戝畾鎿嶄綔瀹屾垚銆�
-            ChannelFuture f = b.bind(8084).sync();
+            ChannelFuture f = b.bind(8044).sync();
             log.info("websocket鍚姩鎴愬姛");
+            log.info("绋嬪簭鍚姩鎴愬姛");
             // 绛夊緟鏈嶅姟鍣ㄧ殑閫氶亾鍏抽棴銆�
             f.channel().closeFuture().sync();
         } finally {
             workerGroup.shutdownGracefully();
             bossGroup.shutdownGracefully();
-            log.info("websocket鍏抽棴");
+            log.error("websocket鍏抽棴");
         }
     }
 
diff --git a/ycl-server/src/main/java/com/ycl/websocket/handler/HeartBeatHandler.java b/ycl-server/src/main/java/com/ycl/websocket/handler/HeartBeatHandler.java
new file mode 100644
index 0000000..1d8cecf
--- /dev/null
+++ b/ycl-server/src/main/java/com/ycl/websocket/handler/HeartBeatHandler.java
@@ -0,0 +1,40 @@
+package com.ycl.websocket.handler;
+
+import com.ycl.websocket.NettyConnect;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.util.AttributeKey;
+
+import java.util.Objects;
+
+/**
+ * @author锛歺p
+ * @date锛�2024/4/19 14:41
+ */
+public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
+
+    private int lossConnectCount = 0;
+    @Override
+    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+        if (evt instanceof IdleStateEvent){
+            IdleStateEvent event = (IdleStateEvent)evt;
+            if (event.state()== IdleState.READER_IDLE){
+                lossConnectCount ++;
+                if (lossConnectCount > 2){
+                    AttributeKey<Long> userIdKey = AttributeKey.valueOf("userId");
+                    Long userId = ctx.channel().attr(userIdKey).get();
+                    if (Objects.nonNull(userId)) {
+                        NettyConnect.getUserChannelMap().remove(userId);
+                    }
+                    NettyConnect.getChannelGroup().remove(ctx.channel());
+                    ctx.channel().close();
+                }
+            }
+        }else {
+            super.userEventTriggered(ctx,evt);
+        }
+    }
+
+}
diff --git a/ycl-server/src/main/java/com/ycl/websocket/handler/WebSocketHandler.java b/ycl-server/src/main/java/com/ycl/websocket/handler/WebSocketHandler.java
new file mode 100644
index 0000000..4167508
--- /dev/null
+++ b/ycl-server/src/main/java/com/ycl/websocket/handler/WebSocketHandler.java
@@ -0,0 +1,120 @@
+package com.ycl.websocket.handler;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.ycl.system.model.LoginUser;
+import com.ycl.system.service.TokenService;
+import com.ycl.websocket.NettyConnect;
+import com.ycl.websocket.msg.Message;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.websocketx.*;
+import io.netty.util.AttributeKey;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+import org.springframework.util.StringUtils;
+
+import java.util.Objects;
+
+@Slf4j
+@ChannelHandler.Sharable
+@Component
+@RequiredArgsConstructor
+public class WebSocketHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
+
+    private final TokenService tokenService;
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) {
+        this.handleWebSocketFrame(ctx, msg);
+    }
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) {
+        log.info("鏈夊鎴风杩炴帴浜�");
+        if (!NettyConnect.getChannelGroup().contains(ctx.channel())) {
+            NettyConnect.getChannelGroup().add(ctx.channel());
+        }
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) {
+        log.info("鏈夊鎴风鏂紑杩炴帴浜�");
+        AttributeKey<Long> userIdKey = AttributeKey.valueOf("userId");
+        Long userId = ctx.channel().attr(userIdKey).get();
+        if (Objects.nonNull(userId)) {
+            NettyConnect.getUserChannelMap().remove(userId);
+        }
+        NettyConnect.getChannelGroup().remove(ctx.channel());
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        ctx.channel().close();
+    }
+
+    // 澶勭悊ws鏁版嵁
+    private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
+        // 澶勭悊鍏抽棴杩炴帴
+        if (frame instanceof CloseWebSocketFrame) {
+            NettyConnect.getChannelGroup().remove(ctx.channel());
+            ctx.close();
+            return;
+        }
+        if (frame instanceof TextWebSocketFrame) {
+            if ("ping".equals(((TextWebSocketFrame) frame).text())) {
+                ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
+                return;
+            }
+            // 澶勭悊token锛屽彧鏈夊鎴风杩炴帴鐨勬椂鍊欐墠浼氬彂閫乻ocket娑堟伅
+            String token = ((TextWebSocketFrame) frame).text();
+            try {
+                Message message = new ObjectMapper().readValue(token, Message.class);
+                // 楠岃瘉token骞跺皢鐢ㄦ埛ID瀛樺叆鍒拌繛鎺ヤ腑
+                this.handleToken(message.getToken(), ctx);
+            } catch (JsonProcessingException e) {
+                log.error("娑堟伅鏍煎紡閿欒");
+            }
+        } else if (frame instanceof BinaryWebSocketFrame) {
+            // 澶勭悊浜岃繘鍒舵秷鎭�
+            // ...
+        } else if (frame instanceof PingWebSocketFrame) {
+            // 鐢ㄤ簬netty瀹炵幇鐨勫鎴风
+            // 澶勭悊 Ping 娑堟伅
+            // 鏀跺埌 Ping 娑堟伅锛屽洖搴斾竴涓� Pong 娑堟伅锛堣〃鏄庢垜杩樻椿鐫�锛�
+            ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
+        } else if (frame instanceof PongWebSocketFrame) {
+            // 澶勭悊 Pong 娑堟伅
+            // pong娑堟伅濡傛灉娌℃湁鐗瑰畾闇�姹傦紝涓嶇敤澶勭悊
+        } else if (frame instanceof ContinuationWebSocketFrame) {
+            // 澶勭悊杩炵画甯ф秷鎭紙姣旇緝澶х殑鏁版嵁锛屽垎鐗囷級
+            // ...
+        }
+    }
+
+    // 澶勭悊token
+    private Long handleToken(String token, ChannelHandlerContext ctx) {
+        if (!StringUtils.hasText(token)) {
+            NettyConnect.getChannelGroup().remove(ctx.channel());
+            throw new RuntimeException("闈炴硶鐨勮闂嚟璇�");
+        }
+        // 鑾峰彇 userId 鍙傛暟
+        LoginUser user = null;
+        user = tokenService.getUserInfoFromToken(token);
+        if (Objects.isNull(user)) {
+            NettyConnect.getChannelGroup().remove(ctx.channel());
+            throw new RuntimeException("鐢ㄦ埛涓嶅瓨鍦�");
+        }
+
+        Long userId = user.getUserId();
+        AttributeKey<Long> userIdKey = AttributeKey.valueOf("userId");
+        ctx.channel().attr(userIdKey).set(userId);
+        NettyConnect.getUserChannelMap().put(userId, ctx.channel());
+
+        return userId;
+    }
+
+
+}
diff --git a/ycl-server/src/main/java/com/ycl/websocket/msg/Message.java b/ycl-server/src/main/java/com/ycl/websocket/msg/Message.java
new file mode 100644
index 0000000..f1ad6a8
--- /dev/null
+++ b/ycl-server/src/main/java/com/ycl/websocket/msg/Message.java
@@ -0,0 +1,22 @@
+package com.ycl.websocket.msg;
+
+import enumeration.MsgTypeEnum;
+import lombok.Data;
+
+/**
+ * @author锛歺p
+ * @date锛�2024/4/20 22:14
+ */
+@Data
+public class Message {
+
+    /** token */
+    private String token;
+
+    /** 娑堟伅鍐呭-json */
+    private String msg;
+
+    /** 娑堟伅绫诲瀷锛岀粰鍓嶇鍒ゅ畾澶勭悊鐨� */
+    private MsgTypeEnum type;
+
+}
diff --git a/ycl-server/src/main/java/com/ycl/websocket/service/DefaultSendWebsocketMsg.java b/ycl-server/src/main/java/com/ycl/websocket/service/DefaultSendWebsocketMsg.java
new file mode 100644
index 0000000..5dc35da
--- /dev/null
+++ b/ycl-server/src/main/java/com/ycl/websocket/service/DefaultSendWebsocketMsg.java
@@ -0,0 +1,63 @@
+package com.ycl.websocket.service;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.ycl.websocket.msg.Message;
+import com.ycl.websocket.NettyConnect;
+import io.netty.channel.Channel;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+import org.springframework.util.CollectionUtils;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * @author锛歺p
+ * @date锛�2024/4/21 9:39
+ */
+@Slf4j
+@Service
+public class DefaultSendWebsocketMsg implements SendWebsocketMsg {
+
+    private final static ObjectMapper json = new ObjectMapper();
+
+    @Override
+    public void sendByUserId(Long userId, Message msg) {
+        Channel channel = NettyConnect.getUserChannelMap().get(userId);
+        if (Objects.nonNull(channel) && channel.isActive()) {
+            try {
+                channel.writeAndFlush(new TextWebSocketFrame(json.writeValueAsString(msg)));
+            } catch (JsonProcessingException e) {
+                log.error("娑堟伅鍙戦�佸け璐ワ紝璇锋鏌ユ秷鎭牸寮�");
+            }
+        }
+    }
+
+    @Override
+    public void sendManyUser(List<Long> userIds, Message msg) {
+        if (CollectionUtils.isEmpty(userIds)) {
+            return;
+        }
+        for (Long userId : userIds) {
+            this.sendByUserId(userId, msg);
+        }
+    }
+
+    @Override
+    public void broadcast(Message msg) {
+        Collection<Channel> connects = NettyConnect.getUserChannelMap().values();
+        try {
+            String data = json.writeValueAsString(msg);
+            connects.stream().forEach(connect -> {
+                if (connect.isActive()) {
+                    connect.writeAndFlush(new TextWebSocketFrame(data));
+                }
+            });
+        } catch (JsonProcessingException e) {
+            log.error("娑堟伅鍙戦�佸け璐ワ紝璇锋鏌ユ秷鎭牸寮�");
+        }
+    }
+}
diff --git a/ycl-server/src/main/java/com/ycl/websocket/service/SendWebsocketMsg.java b/ycl-server/src/main/java/com/ycl/websocket/service/SendWebsocketMsg.java
new file mode 100644
index 0000000..5f53923
--- /dev/null
+++ b/ycl-server/src/main/java/com/ycl/websocket/service/SendWebsocketMsg.java
@@ -0,0 +1,35 @@
+package com.ycl.websocket.service;
+
+import com.ycl.websocket.msg.Message;
+
+import java.util.List;
+
+/**
+ * @author锛歺p
+ * @date锛�2024/4/21 9:33
+ */
+public interface SendWebsocketMsg {
+
+    /**
+     * 鍙戠粰鎸囧畾鏌愪釜浜烘秷鎭�
+     *
+     * @param userId 鐢ㄦ埛ID
+     * @param msg 娑堟伅鍐呭
+     */
+    void sendByUserId(Long userId, Message msg);
+
+    /**
+     * 鍙戠粰鎸囧畾浜烘秷鎭�
+     *
+     * @param msg
+     */
+    void sendManyUser(List<Long> userIds, Message msg);
+
+    /**
+     * 骞挎挱娑堟伅
+     *
+     * @param msg
+     */
+    void broadcast(Message msg);
+
+}

--
Gitblit v1.8.0