package com.monkeylessey.websocket; import com.monkeylessey.websocket.handler.HeartBeatHandler; import com.monkeylessey.websocket.handler.WebSocketHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.codec.serialization.ObjectEncoder; import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.handler.timeout.IdleStateHandler; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; /** * webscoket 服务 * * @author:xp * @date:2024/4/11 17:47 */ @Slf4j @Component @RequiredArgsConstructor public class WebsocketServer { private final WebSocketHandler webSocketHandler; public void runWebsocket() throws Exception { // 处理 I/O 操作的多线程事件循环组(线程池)。bossGroup用于接收传入的连接,workerGroup用于处理IO操作 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 服务器引导(启动)类,提供了一些方法使开发者能够简单的启动服务端 ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) // 指定服务器通道(主、子通道)的实现类 .channel(NioServerSocketChannel.class) // 设置通道初始化器(ChannelInitializer),每次创建的通道都按这个初始化 .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { // websocket的握手阶段是使用的Http,所以需要添处理http请求: // 用于将 HTTP 请求和响应转换为字节流以及将字节流转换为 HTTP 请求和响应 ch.pipeline().addLast(new HttpServerCodec()); ch.pipeline().addLast(new ObjectEncoder()); // 以块的方式来写的处理器 ch.pipeline().addLast(new ChunkedWriteHandler()); // ch.pipeline().addLast(new NettyWebSocketParamHandler(secret)); // 针对客户端,若10s内无读事件则触发心跳处理方法HeartBeatHandler#userEventTriggered ch.pipeline().addLast(new IdleStateHandler(60 , 60 , 60)); // 自定义空闲状态检测(自定义心跳检测handler) ch.pipeline().addLast(new HeartBeatHandler()); ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10)); ch.pipeline().addLast(webSocketHandler); } }) // 设置服务器通道(主通道)的选项,此处是设置连接请求队列的最大长度是128 .option(ChannelOption.SO_BACKLOG, 128); // 绑定服务器到指定的端口,并且等待绑定操作完成。 ChannelFuture f = b.bind(8044).sync(); log.info("websocket启动成功"); log.info("程序启动成功"); // 等待服务器的通道关闭。 f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); log.error("websocket关闭"); } } }