From d4ef4a8eebc8b115bacd9b3170618fd024938042 Mon Sep 17 00:00:00 2001 From: xiangpei <xiangpei@timesnew.cn> Date: 星期二, 23 四月 2024 17:29:02 +0800 Subject: [PATCH] websocket优化 --- ycl-server/src/main/java/com/ycl/websocket/handler/HeartBeatHandler.java | 40 ++++ ycl-server/src/main/java/com/ycl/system/service/TokenService.java | 38 +++ /dev/null | 67 ------ ycl-server/src/main/java/com/ycl/websocket/WebsocketServer.java | 30 ++ ycl-server/src/main/java/com/ycl/websocket/NettyConnect.java | 78 +++++++ ycl-common/src/main/java/enumeration/MsgTypeEnum.java | 19 + ycl-server/src/main/java/com/ycl/websocket/msg/Message.java | 22 ++ ycl-server/src/main/java/com/ycl/websocket/handler/WebSocketHandler.java | 120 ++++++++++++ ycl-server/src/main/java/com/ycl/websocket/service/DefaultSendWebsocketMsg.java | 63 ++++++ ycl-server/src/main/java/com/ycl/PlatformApplication.java | 8 ycl-server/src/main/java/com/ycl/websocket/WebSocketListener.java | 31 +++ ycl-server/src/main/java/com/ycl/websocket/service/SendWebsocketMsg.java | 35 +++ 12 files changed, 471 insertions(+), 80 deletions(-) diff --git a/ycl-common/src/main/java/enumeration/MsgTypeEnum.java b/ycl-common/src/main/java/enumeration/MsgTypeEnum.java new file mode 100644 index 0000000..2b3741c --- /dev/null +++ b/ycl-common/src/main/java/enumeration/MsgTypeEnum.java @@ -0,0 +1,19 @@ +package enumeration; + +/** + * @author锛歺p + * @date锛�2024/4/21 10:25 + */ +public enum MsgTypeEnum { + + ; + + private final String value; + + private final String desc; + + MsgTypeEnum(String value, String desc) { + this.value = value; + this.desc = desc; + } +} diff --git a/ycl-server/src/main/java/com/ycl/PlatformApplication.java b/ycl-server/src/main/java/com/ycl/PlatformApplication.java index 4f02f8a..ba6a7ac 100644 --- a/ycl-server/src/main/java/com/ycl/PlatformApplication.java +++ b/ycl-server/src/main/java/com/ycl/PlatformApplication.java @@ -1,8 +1,6 @@ package com.ycl; -import com.ycl.websocket.WebsocketServer; import lombok.extern.slf4j.Slf4j; -import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cache.annotation.EnableCaching; @@ -16,11 +14,5 @@ public static void main(String[] args) { SpringApplication.run(PlatformApplication.class, args); log.info("(鈾モ棤鈥库棤)锞夛緸 鑷础杩愮淮骞冲彴鍚姩鎴愬姛 醿�(麓凇`醿�)锞�"); - try { - WebsocketServer.runWebsocket(); - - } catch (Exception e) { - log.info("websocket鍚姩澶辫触"); - } } } diff --git a/ycl-server/src/main/java/com/ycl/system/service/TokenService.java b/ycl-server/src/main/java/com/ycl/system/service/TokenService.java index 37c87dc..06c060f 100644 --- a/ycl-server/src/main/java/com/ycl/system/service/TokenService.java +++ b/ycl-server/src/main/java/com/ycl/system/service/TokenService.java @@ -83,6 +83,33 @@ return null; } + + /** + * 鑾峰彇鐢ㄦ埛韬唤淇℃伅 + * + * @return 鐢ㄦ埛淇℃伅 + */ + public LoginUser getLoginUser(String token) + { + if (StringUtils.isNotEmpty(token)) + { + try + { + Claims claims = parseToken(token); + // 瑙f瀽瀵瑰簲鐨勬潈闄愪互鍙婄敤鎴蜂俊鎭� + String uuid = (String) claims.get(Constants.LOGIN_USER_KEY); + String userKey = getTokenKey(uuid); + LoginUser user = redisCache.getCacheObject(userKey); + return user; + } + catch (Exception e) + { + log.error("鑾峰彇鐢ㄦ埛淇℃伅寮傚父'{}'", e.getMessage()); + } + } + return null; + } + /** * 璁剧疆鐢ㄦ埛韬唤淇℃伅 */ @@ -210,6 +237,17 @@ } /** + * 浠庝护鐗屼腑鑾峰彇鐢ㄦ埛鍚� + * + * @param token 浠ょ墝 + * @return 鐢ㄦ埛鍚� + */ + public LoginUser getUserInfoFromToken(String token) + { + return getLoginUser(token); + } + + /** * 鑾峰彇璇锋眰token * * @param request diff --git a/ycl-server/src/main/java/com/ycl/websocket/NettyConfig.java b/ycl-server/src/main/java/com/ycl/websocket/NettyConfig.java deleted file mode 100644 index f2ef8e1..0000000 --- a/ycl-server/src/main/java/com/ycl/websocket/NettyConfig.java +++ /dev/null @@ -1,68 +0,0 @@ -package com.ycl.websocket; - -import io.netty.channel.Channel; -import io.netty.channel.group.ChannelGroup; -import io.netty.channel.group.DefaultChannelGroup; -import io.netty.util.concurrent.GlobalEventExecutor; - -import java.util.concurrent.ConcurrentHashMap; - -/** - * @author锛歺p - * @date锛�2024/4/18 16:10 - */ -public class NettyConfig { - - /** - * 瀹氫箟鍏ㄥ眬鍗曞埄channel缁� 绠$悊鎵�鏈塩hannel - */ - private static volatile ChannelGroup channelGroup = null; - - - /** - * 瀛樻斁璇锋眰ID涓巆hannel鐨勫搴斿叧绯� - */ - private static volatile ConcurrentHashMap<String, Channel> channelMap = null; - - - /** - * 瀹氫箟涓ゆ妸閿� - */ - private static final Object lock1 = new Object(); - private static final Object lock2 = new Object(); - - - - - public static ChannelGroup getChannelGroup() { - if (null == channelGroup) { - synchronized (lock1) { - if (null == channelGroup) { - channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); - } - } - } - return channelGroup; - } - - - public static ConcurrentHashMap<String, Channel> getChannelMap() { - if (null == channelMap) { - synchronized (lock2) { - if (null == channelMap) { - channelMap = new ConcurrentHashMap<>(); - } - } - } - return channelMap; - } - - - public static Channel getChannel(String userId) { - if (null == channelMap) { - return getChannelMap().get(userId); - } - return channelMap.get(userId); - } - -} diff --git a/ycl-server/src/main/java/com/ycl/websocket/NettyConnect.java b/ycl-server/src/main/java/com/ycl/websocket/NettyConnect.java new file mode 100644 index 0000000..f998654 --- /dev/null +++ b/ycl-server/src/main/java/com/ycl/websocket/NettyConnect.java @@ -0,0 +1,78 @@ +package com.ycl.websocket; + +import io.netty.channel.Channel; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.util.concurrent.GlobalEventExecutor; + +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author锛歺p + * @date锛�2024/4/19 14:02 + */ +public class NettyConnect { + + /** + * netty鎻愪緵鐨勭鐞嗚繛鎺ラ泦鍚� + */ + private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); + + /** + * 瀛樻斁鐢ㄦ埛涓嶤hanel鐨勫搴斾俊鎭紝鐢ㄤ簬缁欐寚瀹氱敤鎴峰彂閫佹秷鎭� + */ + private static ConcurrentHashMap<Long, Channel> userChannelMap = new ConcurrentHashMap<>(128); + + private NettyConnect() {} + + /** + * 鑾峰彇channel缁� + * @return + */ + public static ChannelGroup getChannelGroup() { + return channelGroup; + } + + /** + * 绉婚櫎channel鏃讹紝鍚屾椂绉婚櫎鐢ㄦ埛瀵瑰簲鍏崇郴 + * + * @param channel + */ + public static void removeChannel(Channel channel) { + Long userId = null; + for (Map.Entry<Long, Channel> channelEntry : userChannelMap.entrySet()) { + if (Objects.equals(channel, channelEntry.getValue())) { + userId = channelEntry.getKey(); + break; + } + } + if (Objects.nonNull(userId)) { + userChannelMap.remove(userId); + } + channelGroup.remove(channel); + } + + /** + * 鑾峰彇鐢ㄦ埛channel map + * @return + */ + public static ConcurrentHashMap<Long, Channel> getUserChannelMap(){ + return userChannelMap; + } + + /** + * 绉婚櫎鐢ㄦ埛鍏崇郴鏃讹紝鍚屾椂绉婚櫎杩炴帴 + * + * @param userId + */ + public static void removeUserChannel(Integer userId) { + Channel channel = userChannelMap.get(userId); + if (Objects.nonNull(userId)) { + userChannelMap.remove(userId); + channelGroup.remove(channel); + } + } + +} diff --git a/ycl-server/src/main/java/com/ycl/websocket/WebSocketHandler.java b/ycl-server/src/main/java/com/ycl/websocket/WebSocketHandler.java deleted file mode 100644 index 351bf42..0000000 --- a/ycl-server/src/main/java/com/ycl/websocket/WebSocketHandler.java +++ /dev/null @@ -1,67 +0,0 @@ -package com.ycl.websocket; - -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.channel.group.ChannelGroup; -import io.netty.channel.group.DefaultChannelGroup; -import io.netty.handler.codec.http.websocketx.*; -import io.netty.util.concurrent.GlobalEventExecutor; - -import java.util.Objects; - - -public class WebSocketHandler extends SimpleChannelInboundHandler { - - public static ChannelGroup connects = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); - - @Override - protected void channelRead0(ChannelHandlerContext ctx, Object o) throws Exception { - if(Objects.nonNull(o) && o instanceof WebSocketFrame){ - this.handleWebSocketFrame(ctx, (WebSocketFrame) o); - } - } - - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - System.out.println("鏈夋柊鐨勫鎴风杩炴帴涓婁簡"); - connects.add(ctx.channel()); - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - System.out.println("鏈夊鎴风鏂紑杩炴帴浜�"); - connects.remove(ctx.channel()); - } - - // 澶勭悊ws鏁版嵁 - private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { - // 澶勭悊鍏抽棴杩炴帴 - if (frame instanceof CloseWebSocketFrame) { - connects.remove(ctx.channel()); - ctx.close(); - return; - } - if (frame instanceof TextWebSocketFrame) { - // 澶勭悊鏂囨湰娑堟伅 - String text = ((TextWebSocketFrame) frame).text(); - System.out.println("鏈嶅姟鍣ㄦ敹鍒板鎴风鏁版嵁锛�" +text); - // 姝ゅ涓虹兢鍙戯紝鍗曠嫭鍙戝彲浣跨敤connects.find(ctx.channel().id()).writeAndFlush()鍙戦�� - connects.writeAndFlush(new TextWebSocketFrame("浣犲ソ瀹㈡埛绔�")); - // ... - } else if (frame instanceof BinaryWebSocketFrame) { - // 澶勭悊浜岃繘鍒舵秷鎭� - // ... - } else if (frame instanceof PingWebSocketFrame) { - // 澶勭悊 Ping 娑堟伅 - // 鏀跺埌 Ping 娑堟伅锛屽洖搴斾竴涓� Pong 娑堟伅锛堣〃鏄庢垜杩樻椿鐫�锛� - ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain())); - } else if (frame instanceof PongWebSocketFrame) { - // 澶勭悊 Pong 娑堟伅 - // pong娑堟伅濡傛灉娌℃湁鐗瑰畾闇�姹傦紝涓嶇敤澶勭悊 - } else if (frame instanceof ContinuationWebSocketFrame) { - // 澶勭悊杩炵画甯ф秷鎭紙姣旇緝澶х殑鏁版嵁锛屽垎鐗囷級 - // ... - } - } - -} diff --git a/ycl-server/src/main/java/com/ycl/websocket/WebSocketListener.java b/ycl-server/src/main/java/com/ycl/websocket/WebSocketListener.java new file mode 100644 index 0000000..7afce0f --- /dev/null +++ b/ycl-server/src/main/java/com/ycl/websocket/WebSocketListener.java @@ -0,0 +1,31 @@ +package com.ycl.websocket; + +import com.ycl.system.service.TokenService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; + +/** + * @author锛歺p + * @date锛�2024/4/14 10:56 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class WebSocketListener { + + private final WebsocketServer websocketServer; + + @EventListener(classes = {ApplicationReadyEvent.class}) + public void runWebSocket() { + try { + websocketServer.runWebsocket(); + } catch (Exception e) { + log.warn("websocket鍚姩澶辫触"); + } + } + +} diff --git a/ycl-server/src/main/java/com/ycl/websocket/WebsocketServer.java b/ycl-server/src/main/java/com/ycl/websocket/WebsocketServer.java index a2d1603..4620d76 100644 --- a/ycl-server/src/main/java/com/ycl/websocket/WebsocketServer.java +++ b/ycl-server/src/main/java/com/ycl/websocket/WebsocketServer.java @@ -1,5 +1,7 @@ package com.ycl.websocket; +import com.ycl.websocket.handler.HeartBeatHandler; +import com.ycl.websocket.handler.WebSocketHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; @@ -10,7 +12,12 @@ import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; +import io.netty.handler.codec.serialization.ObjectEncoder; +import io.netty.handler.stream.ChunkedWriteHandler; +import io.netty.handler.timeout.IdleStateHandler; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; /** * webscoket 鏈嶅姟 @@ -19,9 +26,13 @@ * @date锛�2024/4/11 17:47 */ @Slf4j +@Component +@RequiredArgsConstructor public class WebsocketServer { - public static void runWebsocket() throws Exception { + private final WebSocketHandler webSocketHandler; + + public void runWebsocket() throws Exception { // 澶勭悊 I/O 鎿嶄綔鐨勫绾跨▼浜嬩欢寰幆缁勶紙绾跨▼姹狅級銆俠ossGroup鐢ㄤ簬鎺ユ敹浼犲叆鐨勮繛鎺ワ紝workerGroup鐢ㄤ簬澶勭悊IO鎿嶄綔 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); @@ -38,22 +49,31 @@ // websocket鐨勬彙鎵嬮樁娈垫槸浣跨敤鐨凥ttp锛屾墍浠ラ渶瑕佹坊澶勭悊http璇锋眰锛� // 鐢ㄤ簬灏� HTTP 璇锋眰鍜屽搷搴旇浆鎹负瀛楄妭娴佷互鍙婂皢瀛楄妭娴佽浆鎹负 HTTP 璇锋眰鍜屽搷搴� ch.pipeline().addLast(new HttpServerCodec()); - ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", null, true, 6000)); - ch.pipeline().addLast(new WebSocketHandler()); + ch.pipeline().addLast(new ObjectEncoder()); + // 浠ュ潡鐨勬柟寮忔潵鍐欑殑澶勭悊鍣� + ch.pipeline().addLast(new ChunkedWriteHandler()); +// ch.pipeline().addLast(new NettyWebSocketParamHandler(secret)); + // 閽堝瀹㈡埛绔紝鑻�10s鍐呮棤璇讳簨浠跺垯瑙﹀彂蹇冭烦澶勭悊鏂规硶HeartBeatHandler#userEventTriggered + ch.pipeline().addLast(new IdleStateHandler(60 , 60 , 60)); + // 鑷畾涔夌┖闂茬姸鎬佹娴�(鑷畾涔夊績璺虫娴媓andler) + ch.pipeline().addLast(new HeartBeatHandler()); + ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10)); + ch.pipeline().addLast(webSocketHandler); } }) // 璁剧疆鏈嶅姟鍣ㄩ�氶亾(涓婚�氶亾)鐨勯�夐」锛屾澶勬槸璁剧疆杩炴帴璇锋眰闃熷垪鐨勬渶澶ч暱搴︽槸128 .option(ChannelOption.SO_BACKLOG, 128); // 缁戝畾鏈嶅姟鍣ㄥ埌鎸囧畾鐨勭鍙o紝骞朵笖绛夊緟缁戝畾鎿嶄綔瀹屾垚銆� - ChannelFuture f = b.bind(8084).sync(); + ChannelFuture f = b.bind(8044).sync(); log.info("websocket鍚姩鎴愬姛"); + log.info("绋嬪簭鍚姩鎴愬姛"); // 绛夊緟鏈嶅姟鍣ㄧ殑閫氶亾鍏抽棴銆� f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); - log.info("websocket鍏抽棴"); + log.error("websocket鍏抽棴"); } } diff --git a/ycl-server/src/main/java/com/ycl/websocket/handler/HeartBeatHandler.java b/ycl-server/src/main/java/com/ycl/websocket/handler/HeartBeatHandler.java new file mode 100644 index 0000000..1d8cecf --- /dev/null +++ b/ycl-server/src/main/java/com/ycl/websocket/handler/HeartBeatHandler.java @@ -0,0 +1,40 @@ +package com.ycl.websocket.handler; + +import com.ycl.websocket.NettyConnect; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.util.AttributeKey; + +import java.util.Objects; + +/** + * @author锛歺p + * @date锛�2024/4/19 14:41 + */ +public class HeartBeatHandler extends ChannelInboundHandlerAdapter { + + private int lossConnectCount = 0; + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent){ + IdleStateEvent event = (IdleStateEvent)evt; + if (event.state()== IdleState.READER_IDLE){ + lossConnectCount ++; + if (lossConnectCount > 2){ + AttributeKey<Long> userIdKey = AttributeKey.valueOf("userId"); + Long userId = ctx.channel().attr(userIdKey).get(); + if (Objects.nonNull(userId)) { + NettyConnect.getUserChannelMap().remove(userId); + } + NettyConnect.getChannelGroup().remove(ctx.channel()); + ctx.channel().close(); + } + } + }else { + super.userEventTriggered(ctx,evt); + } + } + +} diff --git a/ycl-server/src/main/java/com/ycl/websocket/handler/WebSocketHandler.java b/ycl-server/src/main/java/com/ycl/websocket/handler/WebSocketHandler.java new file mode 100644 index 0000000..4167508 --- /dev/null +++ b/ycl-server/src/main/java/com/ycl/websocket/handler/WebSocketHandler.java @@ -0,0 +1,120 @@ +package com.ycl.websocket.handler; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.ycl.system.model.LoginUser; +import com.ycl.system.service.TokenService; +import com.ycl.websocket.NettyConnect; +import com.ycl.websocket.msg.Message; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.websocketx.*; +import io.netty.util.AttributeKey; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; + +import java.util.Objects; + +@Slf4j +@ChannelHandler.Sharable +@Component +@RequiredArgsConstructor +public class WebSocketHandler extends SimpleChannelInboundHandler<WebSocketFrame> { + + private final TokenService tokenService; + + @Override + protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) { + this.handleWebSocketFrame(ctx, msg); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) { + log.info("鏈夊鎴风杩炴帴浜�"); + if (!NettyConnect.getChannelGroup().contains(ctx.channel())) { + NettyConnect.getChannelGroup().add(ctx.channel()); + } + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) { + log.info("鏈夊鎴风鏂紑杩炴帴浜�"); + AttributeKey<Long> userIdKey = AttributeKey.valueOf("userId"); + Long userId = ctx.channel().attr(userIdKey).get(); + if (Objects.nonNull(userId)) { + NettyConnect.getUserChannelMap().remove(userId); + } + NettyConnect.getChannelGroup().remove(ctx.channel()); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + ctx.channel().close(); + } + + // 澶勭悊ws鏁版嵁 + private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { + // 澶勭悊鍏抽棴杩炴帴 + if (frame instanceof CloseWebSocketFrame) { + NettyConnect.getChannelGroup().remove(ctx.channel()); + ctx.close(); + return; + } + if (frame instanceof TextWebSocketFrame) { + if ("ping".equals(((TextWebSocketFrame) frame).text())) { + ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain())); + return; + } + // 澶勭悊token锛屽彧鏈夊鎴风杩炴帴鐨勬椂鍊欐墠浼氬彂閫乻ocket娑堟伅 + String token = ((TextWebSocketFrame) frame).text(); + try { + Message message = new ObjectMapper().readValue(token, Message.class); + // 楠岃瘉token骞跺皢鐢ㄦ埛ID瀛樺叆鍒拌繛鎺ヤ腑 + this.handleToken(message.getToken(), ctx); + } catch (JsonProcessingException e) { + log.error("娑堟伅鏍煎紡閿欒"); + } + } else if (frame instanceof BinaryWebSocketFrame) { + // 澶勭悊浜岃繘鍒舵秷鎭� + // ... + } else if (frame instanceof PingWebSocketFrame) { + // 鐢ㄤ簬netty瀹炵幇鐨勫鎴风 + // 澶勭悊 Ping 娑堟伅 + // 鏀跺埌 Ping 娑堟伅锛屽洖搴斾竴涓� Pong 娑堟伅锛堣〃鏄庢垜杩樻椿鐫�锛� + ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain())); + } else if (frame instanceof PongWebSocketFrame) { + // 澶勭悊 Pong 娑堟伅 + // pong娑堟伅濡傛灉娌℃湁鐗瑰畾闇�姹傦紝涓嶇敤澶勭悊 + } else if (frame instanceof ContinuationWebSocketFrame) { + // 澶勭悊杩炵画甯ф秷鎭紙姣旇緝澶х殑鏁版嵁锛屽垎鐗囷級 + // ... + } + } + + // 澶勭悊token + private Long handleToken(String token, ChannelHandlerContext ctx) { + if (!StringUtils.hasText(token)) { + NettyConnect.getChannelGroup().remove(ctx.channel()); + throw new RuntimeException("闈炴硶鐨勮闂嚟璇�"); + } + // 鑾峰彇 userId 鍙傛暟 + LoginUser user = null; + user = tokenService.getUserInfoFromToken(token); + if (Objects.isNull(user)) { + NettyConnect.getChannelGroup().remove(ctx.channel()); + throw new RuntimeException("鐢ㄦ埛涓嶅瓨鍦�"); + } + + Long userId = user.getUserId(); + AttributeKey<Long> userIdKey = AttributeKey.valueOf("userId"); + ctx.channel().attr(userIdKey).set(userId); + NettyConnect.getUserChannelMap().put(userId, ctx.channel()); + + return userId; + } + + +} diff --git a/ycl-server/src/main/java/com/ycl/websocket/msg/Message.java b/ycl-server/src/main/java/com/ycl/websocket/msg/Message.java new file mode 100644 index 0000000..f1ad6a8 --- /dev/null +++ b/ycl-server/src/main/java/com/ycl/websocket/msg/Message.java @@ -0,0 +1,22 @@ +package com.ycl.websocket.msg; + +import enumeration.MsgTypeEnum; +import lombok.Data; + +/** + * @author锛歺p + * @date锛�2024/4/20 22:14 + */ +@Data +public class Message { + + /** token */ + private String token; + + /** 娑堟伅鍐呭-json */ + private String msg; + + /** 娑堟伅绫诲瀷锛岀粰鍓嶇鍒ゅ畾澶勭悊鐨� */ + private MsgTypeEnum type; + +} diff --git a/ycl-server/src/main/java/com/ycl/websocket/service/DefaultSendWebsocketMsg.java b/ycl-server/src/main/java/com/ycl/websocket/service/DefaultSendWebsocketMsg.java new file mode 100644 index 0000000..5dc35da --- /dev/null +++ b/ycl-server/src/main/java/com/ycl/websocket/service/DefaultSendWebsocketMsg.java @@ -0,0 +1,63 @@ +package com.ycl.websocket.service; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.ycl.websocket.msg.Message; +import com.ycl.websocket.NettyConnect; +import io.netty.channel.Channel; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; + +import java.util.Collection; +import java.util.List; +import java.util.Objects; + +/** + * @author锛歺p + * @date锛�2024/4/21 9:39 + */ +@Slf4j +@Service +public class DefaultSendWebsocketMsg implements SendWebsocketMsg { + + private final static ObjectMapper json = new ObjectMapper(); + + @Override + public void sendByUserId(Long userId, Message msg) { + Channel channel = NettyConnect.getUserChannelMap().get(userId); + if (Objects.nonNull(channel) && channel.isActive()) { + try { + channel.writeAndFlush(new TextWebSocketFrame(json.writeValueAsString(msg))); + } catch (JsonProcessingException e) { + log.error("娑堟伅鍙戦�佸け璐ワ紝璇锋鏌ユ秷鎭牸寮�"); + } + } + } + + @Override + public void sendManyUser(List<Long> userIds, Message msg) { + if (CollectionUtils.isEmpty(userIds)) { + return; + } + for (Long userId : userIds) { + this.sendByUserId(userId, msg); + } + } + + @Override + public void broadcast(Message msg) { + Collection<Channel> connects = NettyConnect.getUserChannelMap().values(); + try { + String data = json.writeValueAsString(msg); + connects.stream().forEach(connect -> { + if (connect.isActive()) { + connect.writeAndFlush(new TextWebSocketFrame(data)); + } + }); + } catch (JsonProcessingException e) { + log.error("娑堟伅鍙戦�佸け璐ワ紝璇锋鏌ユ秷鎭牸寮�"); + } + } +} diff --git a/ycl-server/src/main/java/com/ycl/websocket/service/SendWebsocketMsg.java b/ycl-server/src/main/java/com/ycl/websocket/service/SendWebsocketMsg.java new file mode 100644 index 0000000..5f53923 --- /dev/null +++ b/ycl-server/src/main/java/com/ycl/websocket/service/SendWebsocketMsg.java @@ -0,0 +1,35 @@ +package com.ycl.websocket.service; + +import com.ycl.websocket.msg.Message; + +import java.util.List; + +/** + * @author锛歺p + * @date锛�2024/4/21 9:33 + */ +public interface SendWebsocketMsg { + + /** + * 鍙戠粰鎸囧畾鏌愪釜浜烘秷鎭� + * + * @param userId 鐢ㄦ埛ID + * @param msg 娑堟伅鍐呭 + */ + void sendByUserId(Long userId, Message msg); + + /** + * 鍙戠粰鎸囧畾浜烘秷鎭� + * + * @param msg + */ + void sendManyUser(List<Long> userIds, Message msg); + + /** + * 骞挎挱娑堟伅 + * + * @param msg + */ + void broadcast(Message msg); + +} -- Gitblit v1.8.0