ycl-common/src/main/java/enumeration/MsgTypeEnum.java
New file @@ -0,0 +1,19 @@ package enumeration; /** * @author:xp * @date:2024/4/21 10:25 */ public enum MsgTypeEnum { ; private final String value; private final String desc; MsgTypeEnum(String value, String desc) { this.value = value; this.desc = desc; } } ycl-server/src/main/java/com/ycl/PlatformApplication.java
@@ -1,8 +1,6 @@ package com.ycl; import com.ycl.websocket.WebsocketServer; import lombok.extern.slf4j.Slf4j; import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cache.annotation.EnableCaching; @@ -16,11 +14,5 @@ public static void main(String[] args) { SpringApplication.run(PlatformApplication.class, args); log.info("(♥◠‿◠)ノ゙ 自贡运维平台启动成功 ლ(´ڡ`ლ)゙"); try { WebsocketServer.runWebsocket(); } catch (Exception e) { log.info("websocket启动失败"); } } } ycl-server/src/main/java/com/ycl/system/service/TokenService.java
@@ -83,6 +83,33 @@ return null; } /** * 获取用户身份信息 * * @return 用户信息 */ public LoginUser getLoginUser(String token) { if (StringUtils.isNotEmpty(token)) { try { Claims claims = parseToken(token); // 解析对应的权限以及用户信息 String uuid = (String) claims.get(Constants.LOGIN_USER_KEY); String userKey = getTokenKey(uuid); LoginUser user = redisCache.getCacheObject(userKey); return user; } catch (Exception e) { log.error("获取用户信息异常'{}'", e.getMessage()); } } return null; } /** * 设置用户身份信息 */ @@ -210,6 +237,17 @@ } /** * 从令牌中获取用户名 * * @param token 令牌 * @return 用户名 */ public LoginUser getUserInfoFromToken(String token) { return getLoginUser(token); } /** * 获取请求token * * @param request ycl-server/src/main/java/com/ycl/websocket/NettyConfig.java
File was deleted ycl-server/src/main/java/com/ycl/websocket/NettyConnect.java
New file @@ -0,0 +1,78 @@ package com.ycl.websocket; import io.netty.channel.Channel; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; /** * @author:xp * @date:2024/4/19 14:02 */ public class NettyConnect { /** * netty提供的管理连接集合 */ private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); /** * 存放用户与Chanel的对应信息,用于给指定用户发送消息 */ private static ConcurrentHashMap<Long, Channel> userChannelMap = new ConcurrentHashMap<>(128); private NettyConnect() {} /** * 获取channel组 * @return */ public static ChannelGroup getChannelGroup() { return channelGroup; } /** * 移除channel时,同时移除用户对应关系 * * @param channel */ public static void removeChannel(Channel channel) { Long userId = null; for (Map.Entry<Long, Channel> channelEntry : userChannelMap.entrySet()) { if (Objects.equals(channel, channelEntry.getValue())) { userId = channelEntry.getKey(); break; } } if (Objects.nonNull(userId)) { userChannelMap.remove(userId); } channelGroup.remove(channel); } /** * 获取用户channel map * @return */ public static ConcurrentHashMap<Long, Channel> getUserChannelMap(){ return userChannelMap; } /** * 移除用户关系时,同时移除连接 * * @param userId */ public static void removeUserChannel(Integer userId) { Channel channel = userChannelMap.get(userId); if (Objects.nonNull(userId)) { userChannelMap.remove(userId); channelGroup.remove(channel); } } } ycl-server/src/main/java/com/ycl/websocket/WebSocketHandler.java
File was deleted ycl-server/src/main/java/com/ycl/websocket/WebSocketListener.java
New file @@ -0,0 +1,31 @@ package com.ycl.websocket; import com.ycl.system.service.TokenService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; /** * @author:xp * @date:2024/4/14 10:56 */ @Slf4j @Component @RequiredArgsConstructor public class WebSocketListener { private final WebsocketServer websocketServer; @EventListener(classes = {ApplicationReadyEvent.class}) public void runWebSocket() { try { websocketServer.runWebsocket(); } catch (Exception e) { log.warn("websocket启动失败"); } } } ycl-server/src/main/java/com/ycl/websocket/WebsocketServer.java
@@ -1,5 +1,7 @@ package com.ycl.websocket; import com.ycl.websocket.handler.HeartBeatHandler; import com.ycl.websocket.handler.WebSocketHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; @@ -10,7 +12,12 @@ import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.codec.serialization.ObjectEncoder; import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.handler.timeout.IdleStateHandler; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; /** * webscoket 服务 @@ -19,9 +26,13 @@ * @date:2024/4/11 17:47 */ @Slf4j @Component @RequiredArgsConstructor public class WebsocketServer { public static void runWebsocket() throws Exception { private final WebSocketHandler webSocketHandler; public void runWebsocket() throws Exception { // 处理 I/O 操作的多线程事件循环组(线程池)。bossGroup用于接收传入的连接,workerGroup用于处理IO操作 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); @@ -38,22 +49,31 @@ // websocket的握手阶段是使用的Http,所以需要添处理http请求: // 用于将 HTTP 请求和响应转换为字节流以及将字节流转换为 HTTP 请求和响应 ch.pipeline().addLast(new HttpServerCodec()); ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", null, true, 6000)); ch.pipeline().addLast(new WebSocketHandler()); ch.pipeline().addLast(new ObjectEncoder()); // 以块的方式来写的处理器 ch.pipeline().addLast(new ChunkedWriteHandler()); // ch.pipeline().addLast(new NettyWebSocketParamHandler(secret)); // 针对客户端,若10s内无读事件则触发心跳处理方法HeartBeatHandler#userEventTriggered ch.pipeline().addLast(new IdleStateHandler(60 , 60 , 60)); // 自定义空闲状态检测(自定义心跳检测handler) ch.pipeline().addLast(new HeartBeatHandler()); ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10)); ch.pipeline().addLast(webSocketHandler); } }) // 设置服务器通道(主通道)的选项,此处是设置连接请求队列的最大长度是128 .option(ChannelOption.SO_BACKLOG, 128); // 绑定服务器到指定的端口,并且等待绑定操作完成。 ChannelFuture f = b.bind(8084).sync(); ChannelFuture f = b.bind(8044).sync(); log.info("websocket启动成功"); log.info("程序启动成功"); // 等待服务器的通道关闭。 f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); log.info("websocket关闭"); log.error("websocket关闭"); } } ycl-server/src/main/java/com/ycl/websocket/handler/HeartBeatHandler.java
New file @@ -0,0 +1,40 @@ package com.ycl.websocket.handler; import com.ycl.websocket.NettyConnect; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.AttributeKey; import java.util.Objects; /** * @author:xp * @date:2024/4/19 14:41 */ public class HeartBeatHandler extends ChannelInboundHandlerAdapter { private int lossConnectCount = 0; @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent){ IdleStateEvent event = (IdleStateEvent)evt; if (event.state()== IdleState.READER_IDLE){ lossConnectCount ++; if (lossConnectCount > 2){ AttributeKey<Long> userIdKey = AttributeKey.valueOf("userId"); Long userId = ctx.channel().attr(userIdKey).get(); if (Objects.nonNull(userId)) { NettyConnect.getUserChannelMap().remove(userId); } NettyConnect.getChannelGroup().remove(ctx.channel()); ctx.channel().close(); } } }else { super.userEventTriggered(ctx,evt); } } } ycl-server/src/main/java/com/ycl/websocket/handler/WebSocketHandler.java
New file @@ -0,0 +1,120 @@ package com.ycl.websocket.handler; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.ycl.system.model.LoginUser; import com.ycl.system.service.TokenService; import com.ycl.websocket.NettyConnect; import com.ycl.websocket.msg.Message; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.*; import io.netty.util.AttributeKey; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import java.util.Objects; @Slf4j @ChannelHandler.Sharable @Component @RequiredArgsConstructor public class WebSocketHandler extends SimpleChannelInboundHandler<WebSocketFrame> { private final TokenService tokenService; @Override protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) { this.handleWebSocketFrame(ctx, msg); } @Override public void channelActive(ChannelHandlerContext ctx) { log.info("有客户端连接了"); if (!NettyConnect.getChannelGroup().contains(ctx.channel())) { NettyConnect.getChannelGroup().add(ctx.channel()); } } @Override public void channelInactive(ChannelHandlerContext ctx) { log.info("有客户端断开连接了"); AttributeKey<Long> userIdKey = AttributeKey.valueOf("userId"); Long userId = ctx.channel().attr(userIdKey).get(); if (Objects.nonNull(userId)) { NettyConnect.getUserChannelMap().remove(userId); } NettyConnect.getChannelGroup().remove(ctx.channel()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ctx.channel().close(); } // 处理ws数据 private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { // 处理关闭连接 if (frame instanceof CloseWebSocketFrame) { NettyConnect.getChannelGroup().remove(ctx.channel()); ctx.close(); return; } if (frame instanceof TextWebSocketFrame) { if ("ping".equals(((TextWebSocketFrame) frame).text())) { ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain())); return; } // 处理token,只有客户端连接的时候才会发送socket消息 String token = ((TextWebSocketFrame) frame).text(); try { Message message = new ObjectMapper().readValue(token, Message.class); // 验证token并将用户ID存入到连接中 this.handleToken(message.getToken(), ctx); } catch (JsonProcessingException e) { log.error("消息格式错误"); } } else if (frame instanceof BinaryWebSocketFrame) { // 处理二进制消息 // ... } else if (frame instanceof PingWebSocketFrame) { // 用于netty实现的客户端 // 处理 Ping 消息 // 收到 Ping 消息,回应一个 Pong 消息(表明我还活着) ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain())); } else if (frame instanceof PongWebSocketFrame) { // 处理 Pong 消息 // pong消息如果没有特定需求,不用处理 } else if (frame instanceof ContinuationWebSocketFrame) { // 处理连续帧消息(比较大的数据,分片) // ... } } // 处理token private Long handleToken(String token, ChannelHandlerContext ctx) { if (!StringUtils.hasText(token)) { NettyConnect.getChannelGroup().remove(ctx.channel()); throw new RuntimeException("非法的访问凭证"); } // 获取 userId 参数 LoginUser user = null; user = tokenService.getUserInfoFromToken(token); if (Objects.isNull(user)) { NettyConnect.getChannelGroup().remove(ctx.channel()); throw new RuntimeException("用户不存在"); } Long userId = user.getUserId(); AttributeKey<Long> userIdKey = AttributeKey.valueOf("userId"); ctx.channel().attr(userIdKey).set(userId); NettyConnect.getUserChannelMap().put(userId, ctx.channel()); return userId; } } ycl-server/src/main/java/com/ycl/websocket/msg/Message.java
New file @@ -0,0 +1,22 @@ package com.ycl.websocket.msg; import enumeration.MsgTypeEnum; import lombok.Data; /** * @author:xp * @date:2024/4/20 22:14 */ @Data public class Message { /** token */ private String token; /** 消息内容-json */ private String msg; /** 消息类型,给前端判定处理的 */ private MsgTypeEnum type; } ycl-server/src/main/java/com/ycl/websocket/service/DefaultSendWebsocketMsg.java
New file @@ -0,0 +1,63 @@ package com.ycl.websocket.service; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.ycl.websocket.msg.Message; import com.ycl.websocket.NettyConnect; import io.netty.channel.Channel; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import java.util.Collection; import java.util.List; import java.util.Objects; /** * @author:xp * @date:2024/4/21 9:39 */ @Slf4j @Service public class DefaultSendWebsocketMsg implements SendWebsocketMsg { private final static ObjectMapper json = new ObjectMapper(); @Override public void sendByUserId(Long userId, Message msg) { Channel channel = NettyConnect.getUserChannelMap().get(userId); if (Objects.nonNull(channel) && channel.isActive()) { try { channel.writeAndFlush(new TextWebSocketFrame(json.writeValueAsString(msg))); } catch (JsonProcessingException e) { log.error("消息发送失败,请检查消息格式"); } } } @Override public void sendManyUser(List<Long> userIds, Message msg) { if (CollectionUtils.isEmpty(userIds)) { return; } for (Long userId : userIds) { this.sendByUserId(userId, msg); } } @Override public void broadcast(Message msg) { Collection<Channel> connects = NettyConnect.getUserChannelMap().values(); try { String data = json.writeValueAsString(msg); connects.stream().forEach(connect -> { if (connect.isActive()) { connect.writeAndFlush(new TextWebSocketFrame(data)); } }); } catch (JsonProcessingException e) { log.error("消息发送失败,请检查消息格式"); } } } ycl-server/src/main/java/com/ycl/websocket/service/SendWebsocketMsg.java
New file @@ -0,0 +1,35 @@ package com.ycl.websocket.service; import com.ycl.websocket.msg.Message; import java.util.List; /** * @author:xp * @date:2024/4/21 9:33 */ public interface SendWebsocketMsg { /** * 发给指定某个人消息 * * @param userId 用户ID * @param msg 消息内容 */ void sendByUserId(Long userId, Message msg); /** * 发给指定人消息 * * @param msg */ void sendManyUser(List<Long> userIds, Message msg); /** * 广播消息 * * @param msg */ void broadcast(Message msg); }