xiangpei
2024-04-23 d4ef4a8eebc8b115bacd9b3170618fd024938042
websocket优化
3个文件已修改
8个文件已添加
2个文件已删除
619 ■■■■ 已修改文件
ycl-common/src/main/java/enumeration/MsgTypeEnum.java 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ycl-server/src/main/java/com/ycl/PlatformApplication.java 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ycl-server/src/main/java/com/ycl/system/service/TokenService.java 38 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ycl-server/src/main/java/com/ycl/websocket/NettyConfig.java 68 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ycl-server/src/main/java/com/ycl/websocket/NettyConnect.java 78 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ycl-server/src/main/java/com/ycl/websocket/WebSocketHandler.java 67 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ycl-server/src/main/java/com/ycl/websocket/WebSocketListener.java 31 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ycl-server/src/main/java/com/ycl/websocket/WebsocketServer.java 30 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ycl-server/src/main/java/com/ycl/websocket/handler/HeartBeatHandler.java 40 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ycl-server/src/main/java/com/ycl/websocket/handler/WebSocketHandler.java 120 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ycl-server/src/main/java/com/ycl/websocket/msg/Message.java 22 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ycl-server/src/main/java/com/ycl/websocket/service/DefaultSendWebsocketMsg.java 63 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ycl-server/src/main/java/com/ycl/websocket/service/SendWebsocketMsg.java 35 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ycl-common/src/main/java/enumeration/MsgTypeEnum.java
New file
@@ -0,0 +1,19 @@
package enumeration;
/**
 * @author:xp
 * @date:2024/4/21 10:25
 */
public enum MsgTypeEnum {
    ;
    private final String value;
    private final String desc;
    MsgTypeEnum(String value, String desc) {
        this.value = value;
        this.desc = desc;
    }
}
ycl-server/src/main/java/com/ycl/PlatformApplication.java
@@ -1,8 +1,6 @@
package com.ycl;
import com.ycl.websocket.WebsocketServer;
import lombok.extern.slf4j.Slf4j;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cache.annotation.EnableCaching;
@@ -16,11 +14,5 @@
    public static void main(String[] args) {
        SpringApplication.run(PlatformApplication.class, args);
        log.info("(♥◠‿◠)ノ゙  自贡运维平台启动成功   ლ(´ڡ`ლ)゙");
        try {
            WebsocketServer.runWebsocket();
        } catch (Exception e) {
            log.info("websocket启动失败");
        }
    }
}
ycl-server/src/main/java/com/ycl/system/service/TokenService.java
@@ -83,6 +83,33 @@
        return null;
    }
    /**
     * 获取用户身份信息
     *
     * @return 用户信息
     */
    public LoginUser getLoginUser(String token)
    {
        if (StringUtils.isNotEmpty(token))
        {
            try
            {
                Claims claims = parseToken(token);
                // 解析对应的权限以及用户信息
                String uuid = (String) claims.get(Constants.LOGIN_USER_KEY);
                String userKey = getTokenKey(uuid);
                LoginUser user = redisCache.getCacheObject(userKey);
                return user;
            }
            catch (Exception e)
            {
                log.error("获取用户信息异常'{}'", e.getMessage());
            }
        }
        return null;
    }
    /**
     * 设置用户身份信息
     */
@@ -210,6 +237,17 @@
    }
    /**
     * 从令牌中获取用户名
     *
     * @param token 令牌
     * @return 用户名
     */
    public LoginUser getUserInfoFromToken(String token)
    {
        return getLoginUser(token);
    }
    /**
     * 获取请求token
     *
     * @param request
ycl-server/src/main/java/com/ycl/websocket/NettyConfig.java
File was deleted
ycl-server/src/main/java/com/ycl/websocket/NettyConnect.java
New file
@@ -0,0 +1,78 @@
package com.ycl.websocket;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
/**
 * @author:xp
 * @date:2024/4/19 14:02
 */
public class NettyConnect {
    /**
     * netty提供的管理连接集合
     */
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    /**
     * 存放用户与Chanel的对应信息,用于给指定用户发送消息
     */
    private static ConcurrentHashMap<Long, Channel> userChannelMap = new ConcurrentHashMap<>(128);
    private NettyConnect() {}
    /**
     * 获取channel组
     * @return
     */
    public static ChannelGroup getChannelGroup() {
        return channelGroup;
    }
    /**
     * 移除channel时,同时移除用户对应关系
     *
     * @param channel
     */
    public static void removeChannel(Channel channel) {
        Long userId = null;
        for (Map.Entry<Long, Channel> channelEntry : userChannelMap.entrySet()) {
            if (Objects.equals(channel, channelEntry.getValue())) {
                userId = channelEntry.getKey();
                break;
            }
        }
        if (Objects.nonNull(userId)) {
            userChannelMap.remove(userId);
        }
        channelGroup.remove(channel);
    }
    /**
     * 获取用户channel map
     * @return
     */
    public static ConcurrentHashMap<Long, Channel> getUserChannelMap(){
        return userChannelMap;
    }
    /**
     * 移除用户关系时,同时移除连接
     *
     * @param userId
     */
    public static void removeUserChannel(Integer userId) {
        Channel channel = userChannelMap.get(userId);
        if (Objects.nonNull(userId)) {
            userChannelMap.remove(userId);
            channelGroup.remove(channel);
        }
    }
}
ycl-server/src/main/java/com/ycl/websocket/WebSocketHandler.java
File was deleted
ycl-server/src/main/java/com/ycl/websocket/WebSocketListener.java
New file
@@ -0,0 +1,31 @@
package com.ycl.websocket;
import com.ycl.system.service.TokenService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
/**
 * @author:xp
 * @date:2024/4/14 10:56
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class WebSocketListener {
    private final WebsocketServer websocketServer;
    @EventListener(classes = {ApplicationReadyEvent.class})
    public void runWebSocket() {
        try {
            websocketServer.runWebsocket();
        } catch (Exception e) {
            log.warn("websocket启动失败");
        }
    }
}
ycl-server/src/main/java/com/ycl/websocket/WebsocketServer.java
@@ -1,5 +1,7 @@
package com.ycl.websocket;
import com.ycl.websocket.handler.HeartBeatHandler;
import com.ycl.websocket.handler.WebSocketHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
@@ -10,7 +12,12 @@
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
 * webscoket 服务
@@ -19,9 +26,13 @@
 * @date:2024/4/11 17:47
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class WebsocketServer {
    public static void runWebsocket() throws Exception {
    private final WebSocketHandler webSocketHandler;
    public void runWebsocket() throws Exception {
        // 处理 I/O 操作的多线程事件循环组(线程池)。bossGroup用于接收传入的连接,workerGroup用于处理IO操作
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
@@ -38,22 +49,31 @@
                            // websocket的握手阶段是使用的Http,所以需要添处理http请求:
                            // 用于将 HTTP 请求和响应转换为字节流以及将字节流转换为 HTTP 请求和响应
                            ch.pipeline().addLast(new HttpServerCodec());
                            ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", null, true, 6000));
                            ch.pipeline().addLast(new WebSocketHandler());
                            ch.pipeline().addLast(new ObjectEncoder());
                            // 以块的方式来写的处理器
                            ch.pipeline().addLast(new ChunkedWriteHandler());
//                            ch.pipeline().addLast(new NettyWebSocketParamHandler(secret));
                            // 针对客户端,若10s内无读事件则触发心跳处理方法HeartBeatHandler#userEventTriggered
                            ch.pipeline().addLast(new IdleStateHandler(60 , 60 , 60));
                            // 自定义空闲状态检测(自定义心跳检测handler)
                            ch.pipeline().addLast(new HeartBeatHandler());
                            ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10));
                            ch.pipeline().addLast(webSocketHandler);
                        }
                    })
                    // 设置服务器通道(主通道)的选项,此处是设置连接请求队列的最大长度是128
                    .option(ChannelOption.SO_BACKLOG, 128);
            // 绑定服务器到指定的端口,并且等待绑定操作完成。
            ChannelFuture f = b.bind(8084).sync();
            ChannelFuture f = b.bind(8044).sync();
            log.info("websocket启动成功");
            log.info("程序启动成功");
            // 等待服务器的通道关闭。
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
            log.info("websocket关闭");
            log.error("websocket关闭");
        }
    }
ycl-server/src/main/java/com/ycl/websocket/handler/HeartBeatHandler.java
New file
@@ -0,0 +1,40 @@
package com.ycl.websocket.handler;
import com.ycl.websocket.NettyConnect;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.AttributeKey;
import java.util.Objects;
/**
 * @author:xp
 * @date:2024/4/19 14:41
 */
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
    private int lossConnectCount = 0;
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent){
            IdleStateEvent event = (IdleStateEvent)evt;
            if (event.state()== IdleState.READER_IDLE){
                lossConnectCount ++;
                if (lossConnectCount > 2){
                    AttributeKey<Long> userIdKey = AttributeKey.valueOf("userId");
                    Long userId = ctx.channel().attr(userIdKey).get();
                    if (Objects.nonNull(userId)) {
                        NettyConnect.getUserChannelMap().remove(userId);
                    }
                    NettyConnect.getChannelGroup().remove(ctx.channel());
                    ctx.channel().close();
                }
            }
        }else {
            super.userEventTriggered(ctx,evt);
        }
    }
}
ycl-server/src/main/java/com/ycl/websocket/handler/WebSocketHandler.java
New file
@@ -0,0 +1,120 @@
package com.ycl.websocket.handler;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ycl.system.model.LoginUser;
import com.ycl.system.service.TokenService;
import com.ycl.websocket.NettyConnect;
import com.ycl.websocket.msg.Message;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.AttributeKey;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.Objects;
@Slf4j
@ChannelHandler.Sharable
@Component
@RequiredArgsConstructor
public class WebSocketHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
    private final TokenService tokenService;
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) {
        this.handleWebSocketFrame(ctx, msg);
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        log.info("有客户端连接了");
        if (!NettyConnect.getChannelGroup().contains(ctx.channel())) {
            NettyConnect.getChannelGroup().add(ctx.channel());
        }
    }
    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        log.info("有客户端断开连接了");
        AttributeKey<Long> userIdKey = AttributeKey.valueOf("userId");
        Long userId = ctx.channel().attr(userIdKey).get();
        if (Objects.nonNull(userId)) {
            NettyConnect.getUserChannelMap().remove(userId);
        }
        NettyConnect.getChannelGroup().remove(ctx.channel());
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        ctx.channel().close();
    }
    // 处理ws数据
    private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
        // 处理关闭连接
        if (frame instanceof CloseWebSocketFrame) {
            NettyConnect.getChannelGroup().remove(ctx.channel());
            ctx.close();
            return;
        }
        if (frame instanceof TextWebSocketFrame) {
            if ("ping".equals(((TextWebSocketFrame) frame).text())) {
                ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
                return;
            }
            // 处理token,只有客户端连接的时候才会发送socket消息
            String token = ((TextWebSocketFrame) frame).text();
            try {
                Message message = new ObjectMapper().readValue(token, Message.class);
                // 验证token并将用户ID存入到连接中
                this.handleToken(message.getToken(), ctx);
            } catch (JsonProcessingException e) {
                log.error("消息格式错误");
            }
        } else if (frame instanceof BinaryWebSocketFrame) {
            // 处理二进制消息
            // ...
        } else if (frame instanceof PingWebSocketFrame) {
            // 用于netty实现的客户端
            // 处理 Ping 消息
            // 收到 Ping 消息,回应一个 Pong 消息(表明我还活着)
            ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
        } else if (frame instanceof PongWebSocketFrame) {
            // 处理 Pong 消息
            // pong消息如果没有特定需求,不用处理
        } else if (frame instanceof ContinuationWebSocketFrame) {
            // 处理连续帧消息(比较大的数据,分片)
            // ...
        }
    }
    // 处理token
    private Long handleToken(String token, ChannelHandlerContext ctx) {
        if (!StringUtils.hasText(token)) {
            NettyConnect.getChannelGroup().remove(ctx.channel());
            throw new RuntimeException("非法的访问凭证");
        }
        // 获取 userId 参数
        LoginUser user = null;
        user = tokenService.getUserInfoFromToken(token);
        if (Objects.isNull(user)) {
            NettyConnect.getChannelGroup().remove(ctx.channel());
            throw new RuntimeException("用户不存在");
        }
        Long userId = user.getUserId();
        AttributeKey<Long> userIdKey = AttributeKey.valueOf("userId");
        ctx.channel().attr(userIdKey).set(userId);
        NettyConnect.getUserChannelMap().put(userId, ctx.channel());
        return userId;
    }
}
ycl-server/src/main/java/com/ycl/websocket/msg/Message.java
New file
@@ -0,0 +1,22 @@
package com.ycl.websocket.msg;
import enumeration.MsgTypeEnum;
import lombok.Data;
/**
 * @author:xp
 * @date:2024/4/20 22:14
 */
@Data
public class Message {
    /** token */
    private String token;
    /** 消息内容-json */
    private String msg;
    /** 消息类型,给前端判定处理的 */
    private MsgTypeEnum type;
}
ycl-server/src/main/java/com/ycl/websocket/service/DefaultSendWebsocketMsg.java
New file
@@ -0,0 +1,63 @@
package com.ycl.websocket.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ycl.websocket.msg.Message;
import com.ycl.websocket.NettyConnect;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
/**
 * @author:xp
 * @date:2024/4/21 9:39
 */
@Slf4j
@Service
public class DefaultSendWebsocketMsg implements SendWebsocketMsg {
    private final static ObjectMapper json = new ObjectMapper();
    @Override
    public void sendByUserId(Long userId, Message msg) {
        Channel channel = NettyConnect.getUserChannelMap().get(userId);
        if (Objects.nonNull(channel) && channel.isActive()) {
            try {
                channel.writeAndFlush(new TextWebSocketFrame(json.writeValueAsString(msg)));
            } catch (JsonProcessingException e) {
                log.error("消息发送失败,请检查消息格式");
            }
        }
    }
    @Override
    public void sendManyUser(List<Long> userIds, Message msg) {
        if (CollectionUtils.isEmpty(userIds)) {
            return;
        }
        for (Long userId : userIds) {
            this.sendByUserId(userId, msg);
        }
    }
    @Override
    public void broadcast(Message msg) {
        Collection<Channel> connects = NettyConnect.getUserChannelMap().values();
        try {
            String data = json.writeValueAsString(msg);
            connects.stream().forEach(connect -> {
                if (connect.isActive()) {
                    connect.writeAndFlush(new TextWebSocketFrame(data));
                }
            });
        } catch (JsonProcessingException e) {
            log.error("消息发送失败,请检查消息格式");
        }
    }
}
ycl-server/src/main/java/com/ycl/websocket/service/SendWebsocketMsg.java
New file
@@ -0,0 +1,35 @@
package com.ycl.websocket.service;
import com.ycl.websocket.msg.Message;
import java.util.List;
/**
 * @author:xp
 * @date:2024/4/21 9:33
 */
public interface SendWebsocketMsg {
    /**
     * 发给指定某个人消息
     *
     * @param userId 用户ID
     * @param msg 消息内容
     */
    void sendByUserId(Long userId, Message msg);
    /**
     * 发给指定人消息
     *
     * @param msg
     */
    void sendManyUser(List<Long> userIds, Message msg);
    /**
     * 广播消息
     *
     * @param msg
     */
    void broadcast(Message msg);
}