xiangpei
2024-04-11 ac92608ba4d4a2bbeffc124ee25d5f2778617e0e
netty实现websocket
2个文件已修改
2个文件已添加
139 ■■■■■ 已修改文件
ycl-server/pom.xml 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ycl-server/src/main/java/com/ycl/PlatformApplication.java 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ycl-server/src/main/java/com/ycl/websocket/WebSocketHandler.java 67 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ycl-server/src/main/java/com/ycl/websocket/WebsocketServer.java 57 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ycl-server/pom.xml
@@ -17,6 +17,14 @@
    </properties>
    <dependencies>
        <!--netty-->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
        </dependency>
        <!-- 代码生成-->
        <dependency>
            <groupId>com.ycl</groupId>
ycl-server/src/main/java/com/ycl/PlatformApplication.java
@@ -1,5 +1,6 @@
package com.ycl;
import com.ycl.websocket.WebsocketServer;
import lombok.extern.slf4j.Slf4j;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
@@ -15,5 +16,11 @@
    public static void main(String[] args) {
        SpringApplication.run(PlatformApplication.class, args);
        log.info("(♥◠‿◠)ノ゙  自贡运维平台启动成功   ლ(´ڡ`ლ)゙");
        try {
            WebsocketServer.runWebsocket();
            log.info("websocket启动成功");
        } catch (Exception e) {
            log.info("websocket启动失败");
        }
    }
}
ycl-server/src/main/java/com/ycl/websocket/WebSocketHandler.java
New file
@@ -0,0 +1,67 @@
package com.ycl.websocket;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.util.Objects;
public class WebSocketHandler extends SimpleChannelInboundHandler {
    public static ChannelGroup connects = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object o) throws Exception {
        if(Objects.nonNull(o) && o instanceof WebSocketFrame){
            this.handleWebSocketFrame(ctx, (WebSocketFrame) o);
        }
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("有新的客户端连接上了");
        connects.add(ctx.channel());
    }
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("有客户端断开连接了");
        connects.remove(ctx.channel());
    }
    // 处理ws数据
    private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
        // 处理关闭连接
        if (frame instanceof CloseWebSocketFrame) {
            connects.remove(ctx.channel());
            ctx.close();
            return;
        }
        if (frame instanceof TextWebSocketFrame) {
            // 处理文本消息
            String text = ((TextWebSocketFrame) frame).text();
            System.out.println("服务器收到客户端数据:" +text);
            // 此处为群发,单独发可使用connects.find(ctx.channel().id()).writeAndFlush()发送
            connects.writeAndFlush(new TextWebSocketFrame("你好客户端"));
            // ...
        } else if (frame instanceof BinaryWebSocketFrame) {
            // 处理二进制消息
            // ...
        } else if (frame instanceof PingWebSocketFrame) {
            // 处理 Ping 消息
            // 收到 Ping 消息,回应一个 Pong 消息(表明我还活着)
            ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
        } else if (frame instanceof PongWebSocketFrame) {
            // 处理 Pong 消息
            // pong消息如果没有特定需求,不用处理
        } else if (frame instanceof ContinuationWebSocketFrame) {
            // 处理连续帧消息(比较大的数据,分片)
            // ...
        }
    }
}
ycl-server/src/main/java/com/ycl/websocket/WebsocketServer.java
New file
@@ -0,0 +1,57 @@
package com.ycl.websocket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
/**
 * webscoket 服务
 *
 * @author:xp
 * @date:2024/4/11 17:47
 */
public class WebsocketServer {
    public static void runWebsocket() throws Exception {
        // 处理 I/O 操作的多线程事件循环组(线程池)。bossGroup用于接收传入的连接,workerGroup用于处理IO操作
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // 服务器引导(启动)类,提供了一些方法使开发者能够简单的启动服务端
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    // 指定服务器通道(主、子通道)的实现类
                    .channel(NioServerSocketChannel.class)
                    // 设置通道初始化器(ChannelInitializer),每次创建的通道都按这个初始化
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            // websocket的握手阶段是使用的Http,所以需要添处理http请求:
                            // 用于将 HTTP 请求和响应转换为字节流以及将字节流转换为 HTTP 请求和响应
                            ch.pipeline().addLast(new HttpServerCodec());
                            ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", null, true, 6000));
                            ch.pipeline().addLast(new WebSocketHandler());
                        }
                    })
                    // 设置服务器通道(主通道)的选项,此处是设置连接请求队列的最大长度是128
                    .option(ChannelOption.SO_BACKLOG, 128);
            // 绑定服务器到指定的端口,并且等待绑定操作完成。
            ChannelFuture f = b.bind(8084).sync();
            // 等待服务器的通道关闭。
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}