package com.monkeylessey.websocket.handler;
|
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.monkeylessey.framework.utils.TokenUtil;
|
import com.monkeylessey.sys.domain.vo.SysUserVO;
|
import com.monkeylessey.websocket.Message;
|
import com.monkeylessey.websocket.NettyConnect;
|
import io.netty.channel.ChannelHandler;
|
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.SimpleChannelInboundHandler;
|
import io.netty.handler.codec.http.websocketx.*;
|
import io.netty.util.AttributeKey;
|
import lombok.RequiredArgsConstructor;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.stereotype.Component;
|
import org.springframework.util.StringUtils;
|
|
import java.util.Objects;
|
|
@Slf4j
|
@ChannelHandler.Sharable
|
@Component
|
@RequiredArgsConstructor
|
public class WebSocketHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
|
|
private final TokenUtil tokenUtil;
|
|
|
@Override
|
protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) {
|
this.handleWebSocketFrame(ctx, msg);
|
}
|
|
@Override
|
public void channelActive(ChannelHandlerContext ctx) {
|
if (!NettyConnect.getChannelGroup().contains(ctx.channel())) {
|
NettyConnect.getChannelGroup().add(ctx.channel());
|
}
|
}
|
|
@Override
|
public void channelInactive(ChannelHandlerContext ctx) {
|
log.info("有客户端断开连接了");
|
AttributeKey<Integer> userIdKey = AttributeKey.valueOf("userId");
|
Integer userId = ctx.channel().attr(userIdKey).get();
|
if (Objects.nonNull(userId)) {
|
NettyConnect.getUserChannelMap().remove(userId);
|
}
|
NettyConnect.getChannelGroup().remove(ctx.channel());
|
}
|
|
@Override
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
|
ctx.channel().close();
|
}
|
|
// 处理ws数据
|
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
|
// 处理关闭连接
|
if (frame instanceof CloseWebSocketFrame) {
|
NettyConnect.getChannelGroup().remove(ctx.channel());
|
ctx.close();
|
return;
|
}
|
if (frame instanceof TextWebSocketFrame) {
|
if ("ping".equals(((TextWebSocketFrame) frame).text())) {
|
ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
|
return;
|
}
|
// 处理token,只有客户端连接的时候才会发送socket消息
|
String token = ((TextWebSocketFrame) frame).text();
|
try {
|
Message message = new ObjectMapper().readValue(token, Message.class);
|
// 验证token并将用户ID存入到连接中
|
this.handleToken(message.getToken(), ctx);
|
} catch (JsonProcessingException e) {
|
log.error("消息格式错误");
|
}
|
} else if (frame instanceof BinaryWebSocketFrame) {
|
// 处理二进制消息
|
// ...
|
} else if (frame instanceof PingWebSocketFrame) {
|
// 客户端也是netty实现的就可以用这个来ping、peng
|
// 处理 Ping 消息
|
// 收到 Ping 消息,回应一个 Pong 消息(表明我还活着)
|
ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
|
} else if (frame instanceof PongWebSocketFrame) {
|
// 处理 Pong 消息
|
// pong消息如果没有特定需求,不用处理
|
} else if (frame instanceof ContinuationWebSocketFrame) {
|
// 处理连续帧消息(比较大的数据,分片)
|
// ...
|
}
|
}
|
|
// 处理token
|
private String handleToken(String token, ChannelHandlerContext ctx) {
|
if (!StringUtils.hasText(token)) {
|
NettyConnect.getChannelGroup().remove(ctx.channel());
|
throw new RuntimeException("非法的访问凭证");
|
}
|
// 获取 userId 参数
|
SysUserVO currentUserInfo = null;
|
try {
|
currentUserInfo = tokenUtil.getCurrentUserInfo(token);
|
} catch (JsonProcessingException e) {
|
NettyConnect.getChannelGroup().remove(ctx.channel());
|
throw new RuntimeException("非法的访问凭证");
|
}
|
if (Objects.isNull(currentUserInfo)) {
|
NettyConnect.getChannelGroup().remove(ctx.channel());
|
throw new RuntimeException("用户不存在");
|
}
|
|
String userId = currentUserInfo.getId();
|
AttributeKey<String> userIdKey = AttributeKey.valueOf("userId");
|
ctx.channel().attr(userIdKey).set(userId);
|
NettyConnect.getUserChannelMap().put(userId, ctx.channel());
|
|
return userId;
|
}
|
|
|
}
|