欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Netty入门示例

程序员文章站 2022-04-23 11:55:59
...

服务端:

public class DiscardServer {

    private int port;
    public DiscardServer(int port){
        this.port = port;
    }

    public void run() throws Exception{
        //Group:群组,Loop:循环,Event:事件,这几个东西联在一起,相比大家也大概明白它的用途了。
        //Netty内部都是通过线程在处理各种数据,EventLoopGroup就是用来管理调度他们的,注册Channel,管理他们的生命周期。
        //NioEventLoopGroup是一个处理I/O操作的多线程事件循环
        //因为bossGroup仅接收客户端连接,不做复杂的逻辑处理,为了尽可能减少资源的占用,取值越小越好
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        //workerGroup作为worker,处理boss接收的连接的流量和将接收的连接注册进入这个worker
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //ServerBootstrap负责建立服务端
            //你可以直接使用Channel去建立服务端,但是大多数情况下你无需做这种乏味的事情
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
            //指定使用NioServerSocketChannel产生一个Channel用来接收连接
            .channel(NioServerSocketChannel.class)
            //ChannelInitializer用于配置一个新的Channel
            //用于向你的Channel当中添加ChannelInboundHandler的实现
            .childHandler(new ChannelInitializer<SocketChannel>() {
                public void initChannel(SocketChannel ch) throws Exception {
                    //ChannelPipeline用于存放管理ChannelHandel
                    //ChannelHandler用于处理请求响应的业务逻辑相关代码
                    ch.pipeline().addLast(new ReadTimeoutHandler(30));  // 设置超时处理,若无数据读取则断开连接
                    ch.pipeline().addLast(new DiscardServerHandler());  // 自定义处理器
                };
            })
            //对Channel进行一些配置,具体的配置项查看ChannelOption
            //注意以下是socket的标准参数
            //BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50。
            //Option是为了NioServerSocketChannel设置的,用来接收传入连接的
            .option(ChannelOption.SO_BACKLOG, 128)
            //是否启用心跳保活机制。在双方TCP套接字建立连接后(即都进入ESTABLISHED状态)并且在两个小时左右上层没有任何数据传输的情况下,这套机制才会被**。
            //childOption是用来给父级ServerChannel之下的Channels设置参数的
            .childOption(ChannelOption.SO_KEEPALIVE, true);
            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(port).sync();
            // Wait until the server socket is closed.
            //sync()会同步等待连接操作结果,用户线程将在此wait(),直到连接操作完成之后,线程被notify(),用户代码继续执行
            //closeFuture()当Channel关闭时返回一个ChannelFuture,用于链路检测
            f.channel().closeFuture().sync();
        }finally{
            //资源优雅释放
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {

        int port = 8088;
        try {
            new DiscardServer(port).run();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

客户端:

public class DiscardClient {

    private String server_ip;
    private int port;

    public DiscardClient(String server_ip, int port){
        this.server_ip = server_ip;
        this.port = port;
    }

    public void run() throws Exception{

        EventLoopGroup clientGroup = new NioEventLoopGroup();
        try {

            Bootstrap b = new Bootstrap();
            b.group(clientGroup)
            //指定使用NioSocketChannel产生一个Channel用来接收连接
            .channel(NioSocketChannel.class)
            //ChannelInitializer用于配置一个新的Channel
            //用于向你的Channel当中添加ChannelInboundHandler的实现
            .handler(new ChannelInitializer<SocketChannel>() {
                public void initChannel(SocketChannel ch) throws Exception {
                    //ChannelPipeline用于存放管理ChannelHandel
                    //ChannelHandler用于处理请求响应的业务逻辑相关代码
                    ch.pipeline().addLast(new DiscardClientHandler());
                };
            });

            ChannelFuture cf = b.connect(server_ip, port).sync();

            //sync()会同步等待连接操作结果,用户线程将在此wait(),直到连接操作完成之后,线程被notify(),用户代码继续执行
            //closeFuture()当Channel关闭时返回一个ChannelFuture,用于链路检测
            cf.channel().closeFuture().sync();

        }finally{
            //资源优雅释放
            clientGroup.shutdownGracefully();
            System.out.println("关闭连接...");
        }
    }

    public static void main(String[] args) {

        String server_ip = "127.0.0.1";
        int port = 8088;

        try {
            new DiscardClient(server_ip, port).run();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

自定义handler:
服务端handler:

public class DiscardServerHandler extends ChannelHandlerAdapter {

    /* 
     * @说明:当客户端的channel**时,执行该方法
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客户端连接...");
    }

    /* 
     * @说明:该方法用于接收从客户端接收的信息
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        //ByteBuf是一个引用计数对象实现ReferenceCounted,他就是在有对象引用的时候计数+1,无的时候计数-1,当为0对象释放内存
        ByteBuf request = (ByteBuf)msg;
        try {

            byte[] buf = new byte[request.readableBytes()];

            request.readBytes(buf, 0, buf.length);

            System.out.println(new String(buf));
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

    /* 
     * @说明:channel抛出异常时,执行该方法
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

客户端handler:

public class DiscardClientHandler extends ChannelHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        // 发送消息
        for(int i = 0; i < 3; i++) {
            ctx.channel().writeAndFlush(Unpooled.copiedBuffer("你好,服务端!".getBytes()));
            TimeUnit.SECONDS.sleep(2);
        }

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        ctx.close();
    }
}