package com.monkeylessey.websocket.handler; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.monkeylessey.framework.utils.TokenUtil; import com.monkeylessey.sys.domain.vo.SysUserVO; import com.monkeylessey.websocket.Message; import com.monkeylessey.websocket.NettyConnect; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.*; import io.netty.util.AttributeKey; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import java.util.Objects; @Slf4j @ChannelHandler.Sharable @Component @RequiredArgsConstructor public class WebSocketHandler extends SimpleChannelInboundHandler { private final TokenUtil tokenUtil; @Override protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) { this.handleWebSocketFrame(ctx, msg); } @Override public void channelActive(ChannelHandlerContext ctx) { if (!NettyConnect.getChannelGroup().contains(ctx.channel())) { NettyConnect.getChannelGroup().add(ctx.channel()); } } @Override public void channelInactive(ChannelHandlerContext ctx) { log.info("有客户端断开连接了"); AttributeKey userIdKey = AttributeKey.valueOf("userId"); Integer userId = ctx.channel().attr(userIdKey).get(); if (Objects.nonNull(userId)) { NettyConnect.getUserChannelMap().remove(userId); } NettyConnect.getChannelGroup().remove(ctx.channel()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ctx.channel().close(); } // 处理ws数据 private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { // 处理关闭连接 if (frame instanceof CloseWebSocketFrame) { NettyConnect.getChannelGroup().remove(ctx.channel()); ctx.close(); return; } if (frame instanceof TextWebSocketFrame) { if ("ping".equals(((TextWebSocketFrame) frame).text())) { ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain())); return; } // 处理token,只有客户端连接的时候才会发送socket消息 String token = ((TextWebSocketFrame) frame).text(); try { Message message = new ObjectMapper().readValue(token, Message.class); // 验证token并将用户ID存入到连接中 this.handleToken(message.getToken(), ctx); } catch (JsonProcessingException e) { log.error("消息格式错误"); } } else if (frame instanceof BinaryWebSocketFrame) { // 处理二进制消息 // ... } else if (frame instanceof PingWebSocketFrame) { // 客户端也是netty实现的就可以用这个来ping、peng // 处理 Ping 消息 // 收到 Ping 消息,回应一个 Pong 消息(表明我还活着) ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain())); } else if (frame instanceof PongWebSocketFrame) { // 处理 Pong 消息 // pong消息如果没有特定需求,不用处理 } else if (frame instanceof ContinuationWebSocketFrame) { // 处理连续帧消息(比较大的数据,分片) // ... } } // 处理token private String handleToken(String token, ChannelHandlerContext ctx) { if (!StringUtils.hasText(token)) { NettyConnect.getChannelGroup().remove(ctx.channel()); throw new RuntimeException("非法的访问凭证"); } // 获取 userId 参数 SysUserVO currentUserInfo = null; try { currentUserInfo = tokenUtil.getCurrentUserInfo(token); } catch (JsonProcessingException e) { NettyConnect.getChannelGroup().remove(ctx.channel()); throw new RuntimeException("非法的访问凭证"); } if (Objects.isNull(currentUserInfo)) { NettyConnect.getChannelGroup().remove(ctx.channel()); throw new RuntimeException("用户不存在"); } String userId = currentUserInfo.getId(); AttributeKey userIdKey = AttributeKey.valueOf("userId"); ctx.channel().attr(userIdKey).set(userId); NettyConnect.getUserChannelMap().put(userId, ctx.channel()); return userId; } }