From d4ef4a8eebc8b115bacd9b3170618fd024938042 Mon Sep 17 00:00:00 2001
From: xiangpei <xiangpei@timesnew.cn>
Date: 星期二, 23 四月 2024 17:29:02 +0800
Subject: [PATCH] websocket优化
---
ycl-server/src/main/java/com/ycl/websocket/handler/HeartBeatHandler.java | 40 ++++
ycl-server/src/main/java/com/ycl/system/service/TokenService.java | 38 +++
/dev/null | 67 ------
ycl-server/src/main/java/com/ycl/websocket/WebsocketServer.java | 30 ++
ycl-server/src/main/java/com/ycl/websocket/NettyConnect.java | 78 +++++++
ycl-common/src/main/java/enumeration/MsgTypeEnum.java | 19 +
ycl-server/src/main/java/com/ycl/websocket/msg/Message.java | 22 ++
ycl-server/src/main/java/com/ycl/websocket/handler/WebSocketHandler.java | 120 ++++++++++++
ycl-server/src/main/java/com/ycl/websocket/service/DefaultSendWebsocketMsg.java | 63 ++++++
ycl-server/src/main/java/com/ycl/PlatformApplication.java | 8
ycl-server/src/main/java/com/ycl/websocket/WebSocketListener.java | 31 +++
ycl-server/src/main/java/com/ycl/websocket/service/SendWebsocketMsg.java | 35 +++
12 files changed, 471 insertions(+), 80 deletions(-)
diff --git a/ycl-common/src/main/java/enumeration/MsgTypeEnum.java b/ycl-common/src/main/java/enumeration/MsgTypeEnum.java
new file mode 100644
index 0000000..2b3741c
--- /dev/null
+++ b/ycl-common/src/main/java/enumeration/MsgTypeEnum.java
@@ -0,0 +1,19 @@
+package enumeration;
+
+/**
+ * @author锛歺p
+ * @date锛�2024/4/21 10:25
+ */
+public enum MsgTypeEnum {
+
+ ;
+
+ private final String value;
+
+ private final String desc;
+
+ MsgTypeEnum(String value, String desc) {
+ this.value = value;
+ this.desc = desc;
+ }
+}
diff --git a/ycl-server/src/main/java/com/ycl/PlatformApplication.java b/ycl-server/src/main/java/com/ycl/PlatformApplication.java
index 4f02f8a..ba6a7ac 100644
--- a/ycl-server/src/main/java/com/ycl/PlatformApplication.java
+++ b/ycl-server/src/main/java/com/ycl/PlatformApplication.java
@@ -1,8 +1,6 @@
package com.ycl;
-import com.ycl.websocket.WebsocketServer;
import lombok.extern.slf4j.Slf4j;
-import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cache.annotation.EnableCaching;
@@ -16,11 +14,5 @@
public static void main(String[] args) {
SpringApplication.run(PlatformApplication.class, args);
log.info("(鈾モ棤鈥库棤)锞夛緸 鑷础杩愮淮骞冲彴鍚姩鎴愬姛 醿�(麓凇`醿�)锞�");
- try {
- WebsocketServer.runWebsocket();
-
- } catch (Exception e) {
- log.info("websocket鍚姩澶辫触");
- }
}
}
diff --git a/ycl-server/src/main/java/com/ycl/system/service/TokenService.java b/ycl-server/src/main/java/com/ycl/system/service/TokenService.java
index 37c87dc..06c060f 100644
--- a/ycl-server/src/main/java/com/ycl/system/service/TokenService.java
+++ b/ycl-server/src/main/java/com/ycl/system/service/TokenService.java
@@ -83,6 +83,33 @@
return null;
}
+
+ /**
+ * 鑾峰彇鐢ㄦ埛韬唤淇℃伅
+ *
+ * @return 鐢ㄦ埛淇℃伅
+ */
+ public LoginUser getLoginUser(String token)
+ {
+ if (StringUtils.isNotEmpty(token))
+ {
+ try
+ {
+ Claims claims = parseToken(token);
+ // 瑙f瀽瀵瑰簲鐨勬潈闄愪互鍙婄敤鎴蜂俊鎭�
+ String uuid = (String) claims.get(Constants.LOGIN_USER_KEY);
+ String userKey = getTokenKey(uuid);
+ LoginUser user = redisCache.getCacheObject(userKey);
+ return user;
+ }
+ catch (Exception e)
+ {
+ log.error("鑾峰彇鐢ㄦ埛淇℃伅寮傚父'{}'", e.getMessage());
+ }
+ }
+ return null;
+ }
+
/**
* 璁剧疆鐢ㄦ埛韬唤淇℃伅
*/
@@ -210,6 +237,17 @@
}
/**
+ * 浠庝护鐗屼腑鑾峰彇鐢ㄦ埛鍚�
+ *
+ * @param token 浠ょ墝
+ * @return 鐢ㄦ埛鍚�
+ */
+ public LoginUser getUserInfoFromToken(String token)
+ {
+ return getLoginUser(token);
+ }
+
+ /**
* 鑾峰彇璇锋眰token
*
* @param request
diff --git a/ycl-server/src/main/java/com/ycl/websocket/NettyConfig.java b/ycl-server/src/main/java/com/ycl/websocket/NettyConfig.java
deleted file mode 100644
index f2ef8e1..0000000
--- a/ycl-server/src/main/java/com/ycl/websocket/NettyConfig.java
+++ /dev/null
@@ -1,68 +0,0 @@
-package com.ycl.websocket;
-
-import io.netty.channel.Channel;
-import io.netty.channel.group.ChannelGroup;
-import io.netty.channel.group.DefaultChannelGroup;
-import io.netty.util.concurrent.GlobalEventExecutor;
-
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * @author锛歺p
- * @date锛�2024/4/18 16:10
- */
-public class NettyConfig {
-
- /**
- * 瀹氫箟鍏ㄥ眬鍗曞埄channel缁� 绠$悊鎵�鏈塩hannel
- */
- private static volatile ChannelGroup channelGroup = null;
-
-
- /**
- * 瀛樻斁璇锋眰ID涓巆hannel鐨勫搴斿叧绯�
- */
- private static volatile ConcurrentHashMap<String, Channel> channelMap = null;
-
-
- /**
- * 瀹氫箟涓ゆ妸閿�
- */
- private static final Object lock1 = new Object();
- private static final Object lock2 = new Object();
-
-
-
-
- public static ChannelGroup getChannelGroup() {
- if (null == channelGroup) {
- synchronized (lock1) {
- if (null == channelGroup) {
- channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
- }
- }
- }
- return channelGroup;
- }
-
-
- public static ConcurrentHashMap<String, Channel> getChannelMap() {
- if (null == channelMap) {
- synchronized (lock2) {
- if (null == channelMap) {
- channelMap = new ConcurrentHashMap<>();
- }
- }
- }
- return channelMap;
- }
-
-
- public static Channel getChannel(String userId) {
- if (null == channelMap) {
- return getChannelMap().get(userId);
- }
- return channelMap.get(userId);
- }
-
-}
diff --git a/ycl-server/src/main/java/com/ycl/websocket/NettyConnect.java b/ycl-server/src/main/java/com/ycl/websocket/NettyConnect.java
new file mode 100644
index 0000000..f998654
--- /dev/null
+++ b/ycl-server/src/main/java/com/ycl/websocket/NettyConnect.java
@@ -0,0 +1,78 @@
+package com.ycl.websocket;
+
+import io.netty.channel.Channel;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.util.concurrent.GlobalEventExecutor;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author锛歺p
+ * @date锛�2024/4/19 14:02
+ */
+public class NettyConnect {
+
+ /**
+ * netty鎻愪緵鐨勭鐞嗚繛鎺ラ泦鍚�
+ */
+ private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+
+ /**
+ * 瀛樻斁鐢ㄦ埛涓嶤hanel鐨勫搴斾俊鎭紝鐢ㄤ簬缁欐寚瀹氱敤鎴峰彂閫佹秷鎭�
+ */
+ private static ConcurrentHashMap<Long, Channel> userChannelMap = new ConcurrentHashMap<>(128);
+
+ private NettyConnect() {}
+
+ /**
+ * 鑾峰彇channel缁�
+ * @return
+ */
+ public static ChannelGroup getChannelGroup() {
+ return channelGroup;
+ }
+
+ /**
+ * 绉婚櫎channel鏃讹紝鍚屾椂绉婚櫎鐢ㄦ埛瀵瑰簲鍏崇郴
+ *
+ * @param channel
+ */
+ public static void removeChannel(Channel channel) {
+ Long userId = null;
+ for (Map.Entry<Long, Channel> channelEntry : userChannelMap.entrySet()) {
+ if (Objects.equals(channel, channelEntry.getValue())) {
+ userId = channelEntry.getKey();
+ break;
+ }
+ }
+ if (Objects.nonNull(userId)) {
+ userChannelMap.remove(userId);
+ }
+ channelGroup.remove(channel);
+ }
+
+ /**
+ * 鑾峰彇鐢ㄦ埛channel map
+ * @return
+ */
+ public static ConcurrentHashMap<Long, Channel> getUserChannelMap(){
+ return userChannelMap;
+ }
+
+ /**
+ * 绉婚櫎鐢ㄦ埛鍏崇郴鏃讹紝鍚屾椂绉婚櫎杩炴帴
+ *
+ * @param userId
+ */
+ public static void removeUserChannel(Integer userId) {
+ Channel channel = userChannelMap.get(userId);
+ if (Objects.nonNull(userId)) {
+ userChannelMap.remove(userId);
+ channelGroup.remove(channel);
+ }
+ }
+
+}
diff --git a/ycl-server/src/main/java/com/ycl/websocket/WebSocketHandler.java b/ycl-server/src/main/java/com/ycl/websocket/WebSocketHandler.java
deleted file mode 100644
index 351bf42..0000000
--- a/ycl-server/src/main/java/com/ycl/websocket/WebSocketHandler.java
+++ /dev/null
@@ -1,67 +0,0 @@
-package com.ycl.websocket;
-
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.group.ChannelGroup;
-import io.netty.channel.group.DefaultChannelGroup;
-import io.netty.handler.codec.http.websocketx.*;
-import io.netty.util.concurrent.GlobalEventExecutor;
-
-import java.util.Objects;
-
-
-public class WebSocketHandler extends SimpleChannelInboundHandler {
-
- public static ChannelGroup connects = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
-
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, Object o) throws Exception {
- if(Objects.nonNull(o) && o instanceof WebSocketFrame){
- this.handleWebSocketFrame(ctx, (WebSocketFrame) o);
- }
- }
-
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- System.out.println("鏈夋柊鐨勫鎴风杩炴帴涓婁簡");
- connects.add(ctx.channel());
- }
-
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- System.out.println("鏈夊鎴风鏂紑杩炴帴浜�");
- connects.remove(ctx.channel());
- }
-
- // 澶勭悊ws鏁版嵁
- private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
- // 澶勭悊鍏抽棴杩炴帴
- if (frame instanceof CloseWebSocketFrame) {
- connects.remove(ctx.channel());
- ctx.close();
- return;
- }
- if (frame instanceof TextWebSocketFrame) {
- // 澶勭悊鏂囨湰娑堟伅
- String text = ((TextWebSocketFrame) frame).text();
- System.out.println("鏈嶅姟鍣ㄦ敹鍒板鎴风鏁版嵁锛�" +text);
- // 姝ゅ涓虹兢鍙戯紝鍗曠嫭鍙戝彲浣跨敤connects.find(ctx.channel().id()).writeAndFlush()鍙戦��
- connects.writeAndFlush(new TextWebSocketFrame("浣犲ソ瀹㈡埛绔�"));
- // ...
- } else if (frame instanceof BinaryWebSocketFrame) {
- // 澶勭悊浜岃繘鍒舵秷鎭�
- // ...
- } else if (frame instanceof PingWebSocketFrame) {
- // 澶勭悊 Ping 娑堟伅
- // 鏀跺埌 Ping 娑堟伅锛屽洖搴斾竴涓� Pong 娑堟伅锛堣〃鏄庢垜杩樻椿鐫�锛�
- ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
- } else if (frame instanceof PongWebSocketFrame) {
- // 澶勭悊 Pong 娑堟伅
- // pong娑堟伅濡傛灉娌℃湁鐗瑰畾闇�姹傦紝涓嶇敤澶勭悊
- } else if (frame instanceof ContinuationWebSocketFrame) {
- // 澶勭悊杩炵画甯ф秷鎭紙姣旇緝澶х殑鏁版嵁锛屽垎鐗囷級
- // ...
- }
- }
-
-}
diff --git a/ycl-server/src/main/java/com/ycl/websocket/WebSocketListener.java b/ycl-server/src/main/java/com/ycl/websocket/WebSocketListener.java
new file mode 100644
index 0000000..7afce0f
--- /dev/null
+++ b/ycl-server/src/main/java/com/ycl/websocket/WebSocketListener.java
@@ -0,0 +1,31 @@
+package com.ycl.websocket;
+
+import com.ycl.system.service.TokenService;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.event.ApplicationReadyEvent;
+import org.springframework.context.event.EventListener;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author锛歺p
+ * @date锛�2024/4/14 10:56
+ */
+@Slf4j
+@Component
+@RequiredArgsConstructor
+public class WebSocketListener {
+
+ private final WebsocketServer websocketServer;
+
+ @EventListener(classes = {ApplicationReadyEvent.class})
+ public void runWebSocket() {
+ try {
+ websocketServer.runWebsocket();
+ } catch (Exception e) {
+ log.warn("websocket鍚姩澶辫触");
+ }
+ }
+
+}
diff --git a/ycl-server/src/main/java/com/ycl/websocket/WebsocketServer.java b/ycl-server/src/main/java/com/ycl/websocket/WebsocketServer.java
index a2d1603..4620d76 100644
--- a/ycl-server/src/main/java/com/ycl/websocket/WebsocketServer.java
+++ b/ycl-server/src/main/java/com/ycl/websocket/WebsocketServer.java
@@ -1,5 +1,7 @@
package com.ycl.websocket;
+import com.ycl.websocket.handler.HeartBeatHandler;
+import com.ycl.websocket.handler.WebSocketHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
@@ -10,7 +12,12 @@
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
+import io.netty.handler.codec.serialization.ObjectEncoder;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import io.netty.handler.timeout.IdleStateHandler;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
/**
* webscoket 鏈嶅姟
@@ -19,9 +26,13 @@
* @date锛�2024/4/11 17:47
*/
@Slf4j
+@Component
+@RequiredArgsConstructor
public class WebsocketServer {
- public static void runWebsocket() throws Exception {
+ private final WebSocketHandler webSocketHandler;
+
+ public void runWebsocket() throws Exception {
// 澶勭悊 I/O 鎿嶄綔鐨勫绾跨▼浜嬩欢寰幆缁勶紙绾跨▼姹狅級銆俠ossGroup鐢ㄤ簬鎺ユ敹浼犲叆鐨勮繛鎺ワ紝workerGroup鐢ㄤ簬澶勭悊IO鎿嶄綔
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
@@ -38,22 +49,31 @@
// websocket鐨勬彙鎵嬮樁娈垫槸浣跨敤鐨凥ttp锛屾墍浠ラ渶瑕佹坊澶勭悊http璇锋眰锛�
// 鐢ㄤ簬灏� HTTP 璇锋眰鍜屽搷搴旇浆鎹负瀛楄妭娴佷互鍙婂皢瀛楄妭娴佽浆鎹负 HTTP 璇锋眰鍜屽搷搴�
ch.pipeline().addLast(new HttpServerCodec());
- ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", null, true, 6000));
- ch.pipeline().addLast(new WebSocketHandler());
+ ch.pipeline().addLast(new ObjectEncoder());
+ // 浠ュ潡鐨勬柟寮忔潵鍐欑殑澶勭悊鍣�
+ ch.pipeline().addLast(new ChunkedWriteHandler());
+// ch.pipeline().addLast(new NettyWebSocketParamHandler(secret));
+ // 閽堝瀹㈡埛绔紝鑻�10s鍐呮棤璇讳簨浠跺垯瑙﹀彂蹇冭烦澶勭悊鏂规硶HeartBeatHandler#userEventTriggered
+ ch.pipeline().addLast(new IdleStateHandler(60 , 60 , 60));
+ // 鑷畾涔夌┖闂茬姸鎬佹娴�(鑷畾涔夊績璺虫娴媓andler)
+ ch.pipeline().addLast(new HeartBeatHandler());
+ ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10));
+ ch.pipeline().addLast(webSocketHandler);
}
})
// 璁剧疆鏈嶅姟鍣ㄩ�氶亾(涓婚�氶亾)鐨勯�夐」锛屾澶勬槸璁剧疆杩炴帴璇锋眰闃熷垪鐨勬渶澶ч暱搴︽槸128
.option(ChannelOption.SO_BACKLOG, 128);
// 缁戝畾鏈嶅姟鍣ㄥ埌鎸囧畾鐨勭鍙o紝骞朵笖绛夊緟缁戝畾鎿嶄綔瀹屾垚銆�
- ChannelFuture f = b.bind(8084).sync();
+ ChannelFuture f = b.bind(8044).sync();
log.info("websocket鍚姩鎴愬姛");
+ log.info("绋嬪簭鍚姩鎴愬姛");
// 绛夊緟鏈嶅姟鍣ㄧ殑閫氶亾鍏抽棴銆�
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
- log.info("websocket鍏抽棴");
+ log.error("websocket鍏抽棴");
}
}
diff --git a/ycl-server/src/main/java/com/ycl/websocket/handler/HeartBeatHandler.java b/ycl-server/src/main/java/com/ycl/websocket/handler/HeartBeatHandler.java
new file mode 100644
index 0000000..1d8cecf
--- /dev/null
+++ b/ycl-server/src/main/java/com/ycl/websocket/handler/HeartBeatHandler.java
@@ -0,0 +1,40 @@
+package com.ycl.websocket.handler;
+
+import com.ycl.websocket.NettyConnect;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.util.AttributeKey;
+
+import java.util.Objects;
+
+/**
+ * @author锛歺p
+ * @date锛�2024/4/19 14:41
+ */
+public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
+
+ private int lossConnectCount = 0;
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+ if (evt instanceof IdleStateEvent){
+ IdleStateEvent event = (IdleStateEvent)evt;
+ if (event.state()== IdleState.READER_IDLE){
+ lossConnectCount ++;
+ if (lossConnectCount > 2){
+ AttributeKey<Long> userIdKey = AttributeKey.valueOf("userId");
+ Long userId = ctx.channel().attr(userIdKey).get();
+ if (Objects.nonNull(userId)) {
+ NettyConnect.getUserChannelMap().remove(userId);
+ }
+ NettyConnect.getChannelGroup().remove(ctx.channel());
+ ctx.channel().close();
+ }
+ }
+ }else {
+ super.userEventTriggered(ctx,evt);
+ }
+ }
+
+}
diff --git a/ycl-server/src/main/java/com/ycl/websocket/handler/WebSocketHandler.java b/ycl-server/src/main/java/com/ycl/websocket/handler/WebSocketHandler.java
new file mode 100644
index 0000000..4167508
--- /dev/null
+++ b/ycl-server/src/main/java/com/ycl/websocket/handler/WebSocketHandler.java
@@ -0,0 +1,120 @@
+package com.ycl.websocket.handler;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.ycl.system.model.LoginUser;
+import com.ycl.system.service.TokenService;
+import com.ycl.websocket.NettyConnect;
+import com.ycl.websocket.msg.Message;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.websocketx.*;
+import io.netty.util.AttributeKey;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+import org.springframework.util.StringUtils;
+
+import java.util.Objects;
+
+@Slf4j
+@ChannelHandler.Sharable
+@Component
+@RequiredArgsConstructor
+public class WebSocketHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
+
+ private final TokenService tokenService;
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) {
+ this.handleWebSocketFrame(ctx, msg);
+ }
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) {
+ log.info("鏈夊鎴风杩炴帴浜�");
+ if (!NettyConnect.getChannelGroup().contains(ctx.channel())) {
+ NettyConnect.getChannelGroup().add(ctx.channel());
+ }
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) {
+ log.info("鏈夊鎴风鏂紑杩炴帴浜�");
+ AttributeKey<Long> userIdKey = AttributeKey.valueOf("userId");
+ Long userId = ctx.channel().attr(userIdKey).get();
+ if (Objects.nonNull(userId)) {
+ NettyConnect.getUserChannelMap().remove(userId);
+ }
+ NettyConnect.getChannelGroup().remove(ctx.channel());
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ ctx.channel().close();
+ }
+
+ // 澶勭悊ws鏁版嵁
+ private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
+ // 澶勭悊鍏抽棴杩炴帴
+ if (frame instanceof CloseWebSocketFrame) {
+ NettyConnect.getChannelGroup().remove(ctx.channel());
+ ctx.close();
+ return;
+ }
+ if (frame instanceof TextWebSocketFrame) {
+ if ("ping".equals(((TextWebSocketFrame) frame).text())) {
+ ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
+ return;
+ }
+ // 澶勭悊token锛屽彧鏈夊鎴风杩炴帴鐨勬椂鍊欐墠浼氬彂閫乻ocket娑堟伅
+ String token = ((TextWebSocketFrame) frame).text();
+ try {
+ Message message = new ObjectMapper().readValue(token, Message.class);
+ // 楠岃瘉token骞跺皢鐢ㄦ埛ID瀛樺叆鍒拌繛鎺ヤ腑
+ this.handleToken(message.getToken(), ctx);
+ } catch (JsonProcessingException e) {
+ log.error("娑堟伅鏍煎紡閿欒");
+ }
+ } else if (frame instanceof BinaryWebSocketFrame) {
+ // 澶勭悊浜岃繘鍒舵秷鎭�
+ // ...
+ } else if (frame instanceof PingWebSocketFrame) {
+ // 鐢ㄤ簬netty瀹炵幇鐨勫鎴风
+ // 澶勭悊 Ping 娑堟伅
+ // 鏀跺埌 Ping 娑堟伅锛屽洖搴斾竴涓� Pong 娑堟伅锛堣〃鏄庢垜杩樻椿鐫�锛�
+ ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
+ } else if (frame instanceof PongWebSocketFrame) {
+ // 澶勭悊 Pong 娑堟伅
+ // pong娑堟伅濡傛灉娌℃湁鐗瑰畾闇�姹傦紝涓嶇敤澶勭悊
+ } else if (frame instanceof ContinuationWebSocketFrame) {
+ // 澶勭悊杩炵画甯ф秷鎭紙姣旇緝澶х殑鏁版嵁锛屽垎鐗囷級
+ // ...
+ }
+ }
+
+ // 澶勭悊token
+ private Long handleToken(String token, ChannelHandlerContext ctx) {
+ if (!StringUtils.hasText(token)) {
+ NettyConnect.getChannelGroup().remove(ctx.channel());
+ throw new RuntimeException("闈炴硶鐨勮闂嚟璇�");
+ }
+ // 鑾峰彇 userId 鍙傛暟
+ LoginUser user = null;
+ user = tokenService.getUserInfoFromToken(token);
+ if (Objects.isNull(user)) {
+ NettyConnect.getChannelGroup().remove(ctx.channel());
+ throw new RuntimeException("鐢ㄦ埛涓嶅瓨鍦�");
+ }
+
+ Long userId = user.getUserId();
+ AttributeKey<Long> userIdKey = AttributeKey.valueOf("userId");
+ ctx.channel().attr(userIdKey).set(userId);
+ NettyConnect.getUserChannelMap().put(userId, ctx.channel());
+
+ return userId;
+ }
+
+
+}
diff --git a/ycl-server/src/main/java/com/ycl/websocket/msg/Message.java b/ycl-server/src/main/java/com/ycl/websocket/msg/Message.java
new file mode 100644
index 0000000..f1ad6a8
--- /dev/null
+++ b/ycl-server/src/main/java/com/ycl/websocket/msg/Message.java
@@ -0,0 +1,22 @@
+package com.ycl.websocket.msg;
+
+import enumeration.MsgTypeEnum;
+import lombok.Data;
+
+/**
+ * @author锛歺p
+ * @date锛�2024/4/20 22:14
+ */
+@Data
+public class Message {
+
+ /** token */
+ private String token;
+
+ /** 娑堟伅鍐呭-json */
+ private String msg;
+
+ /** 娑堟伅绫诲瀷锛岀粰鍓嶇鍒ゅ畾澶勭悊鐨� */
+ private MsgTypeEnum type;
+
+}
diff --git a/ycl-server/src/main/java/com/ycl/websocket/service/DefaultSendWebsocketMsg.java b/ycl-server/src/main/java/com/ycl/websocket/service/DefaultSendWebsocketMsg.java
new file mode 100644
index 0000000..5dc35da
--- /dev/null
+++ b/ycl-server/src/main/java/com/ycl/websocket/service/DefaultSendWebsocketMsg.java
@@ -0,0 +1,63 @@
+package com.ycl.websocket.service;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.ycl.websocket.msg.Message;
+import com.ycl.websocket.NettyConnect;
+import io.netty.channel.Channel;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+import org.springframework.util.CollectionUtils;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * @author锛歺p
+ * @date锛�2024/4/21 9:39
+ */
+@Slf4j
+@Service
+public class DefaultSendWebsocketMsg implements SendWebsocketMsg {
+
+ private final static ObjectMapper json = new ObjectMapper();
+
+ @Override
+ public void sendByUserId(Long userId, Message msg) {
+ Channel channel = NettyConnect.getUserChannelMap().get(userId);
+ if (Objects.nonNull(channel) && channel.isActive()) {
+ try {
+ channel.writeAndFlush(new TextWebSocketFrame(json.writeValueAsString(msg)));
+ } catch (JsonProcessingException e) {
+ log.error("娑堟伅鍙戦�佸け璐ワ紝璇锋鏌ユ秷鎭牸寮�");
+ }
+ }
+ }
+
+ @Override
+ public void sendManyUser(List<Long> userIds, Message msg) {
+ if (CollectionUtils.isEmpty(userIds)) {
+ return;
+ }
+ for (Long userId : userIds) {
+ this.sendByUserId(userId, msg);
+ }
+ }
+
+ @Override
+ public void broadcast(Message msg) {
+ Collection<Channel> connects = NettyConnect.getUserChannelMap().values();
+ try {
+ String data = json.writeValueAsString(msg);
+ connects.stream().forEach(connect -> {
+ if (connect.isActive()) {
+ connect.writeAndFlush(new TextWebSocketFrame(data));
+ }
+ });
+ } catch (JsonProcessingException e) {
+ log.error("娑堟伅鍙戦�佸け璐ワ紝璇锋鏌ユ秷鎭牸寮�");
+ }
+ }
+}
diff --git a/ycl-server/src/main/java/com/ycl/websocket/service/SendWebsocketMsg.java b/ycl-server/src/main/java/com/ycl/websocket/service/SendWebsocketMsg.java
new file mode 100644
index 0000000..5f53923
--- /dev/null
+++ b/ycl-server/src/main/java/com/ycl/websocket/service/SendWebsocketMsg.java
@@ -0,0 +1,35 @@
+package com.ycl.websocket.service;
+
+import com.ycl.websocket.msg.Message;
+
+import java.util.List;
+
+/**
+ * @author锛歺p
+ * @date锛�2024/4/21 9:33
+ */
+public interface SendWebsocketMsg {
+
+ /**
+ * 鍙戠粰鎸囧畾鏌愪釜浜烘秷鎭�
+ *
+ * @param userId 鐢ㄦ埛ID
+ * @param msg 娑堟伅鍐呭
+ */
+ void sendByUserId(Long userId, Message msg);
+
+ /**
+ * 鍙戠粰鎸囧畾浜烘秷鎭�
+ *
+ * @param msg
+ */
+ void sendManyUser(List<Long> userIds, Message msg);
+
+ /**
+ * 骞挎挱娑堟伅
+ *
+ * @param msg
+ */
+ void broadcast(Message msg);
+
+}
--
Gitblit v1.8.0