欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

SpringBoot集成netty在项目中的应用

程序员文章站 2022-07-12 20:29:36
...

最近做的这个项目,需要和服务端进行tcp通信。大概的需求是这样,服务端是物理硬件,由c++代码控制,后台和机器通过tcp进行通信。当前台输入数据给后台之后,后台作为客户端给服务端发送数据,返回给后台,后台通过websocket将数据不停的发送到前台。下面记录一下后台作为客户端发送给机器的部分。


netty的客户端

package cn.zxw.netty.work;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * @author zxw
 * @version 1.0
 * @description 连接的客户端
 * @data: 2020/3/6 21:33
 */
@Component
public class SayOnClient implements InitializingBean {

    private EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
    @Value("${netty.host}")
    private String host;
    @Value("${netty.port}")
    private int port;
    @Value("${netty.readTime}")
    private int readTime;
    @Value("${netty.writeTime}")
    private int writeTime;
    @Value("${netty.allTime}")
    private int allTime;

    public static Channel channel = null;

    @Override
    public void afterPropertiesSet() {
        createBootStrap(new Bootstrap(), eventLoopGroup);
    }

    void createBootStrap(Bootstrap bootstrap, EventLoopGroup eventLoopGroup) {
        try {
            if (bootstrap != null) {
                //定义handler  因为在内部类里面无法传入this对象定义到外面传递过去
                final SayOnHandler sayOnHandler = new SayOnHandler(this);
                bootstrap.group(eventLoopGroup)
                        .channel(NioSocketChannel.class)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                socketChannel.pipeline().addLast(new IdleStateHandler(readTime, writeTime, allTime))
                                        //添加自定义handler
                                        .addLast(sayOnHandler)
                                        //添加解码器
                                        .addLast("sayOnDecoder", new SayOnDecoder());
                            }
                        });
                ChannelFuture channelFuture = bootstrap.connect(host, port);
                channelFuture.addListener(new SayOnListener(this));
                //获得channel对象赋值给静态变量
                channel = channelFuture.channel();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}


自定义的handler

package cn.zxw.netty.work;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.springframework.stereotype.Component;

import java.nio.channels.SocketChannel;
import java.util.concurrent.TimeUnit;

/**
 * @author zxw
 * @version 1.0
 * @description 自定义handler  的区别就是SimpleChannelInboundHandler
 * 在接收到数据后会自动release掉数据占用的Bytebuffer资源(自动调用Bytebuffer.release())。
 * 而为何服务器端不能用呢,因为我们想让服务器把客户端请求的数据发送回去,
 * 而服务器端有可能在channelRead方法返回前还没有写完数据,因此不能让它自动release
 * @data: 2020/3/7 20:21
 */
@Component
@ChannelHandler.Sharable
public class SayOnHandler extends SimpleChannelInboundHandler<SocketChannel> {

    private SayOnClient sayOnClient;

    public SayOnHandler(SayOnClient sayOnClient) {
        this.sayOnClient = sayOnClient;
    }

    /**
     * 服务端断开连接会触发,断开后客户端会尝试重连操作
     *
     * @param ctx
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        try {
            final EventLoop eventLoop = ctx.channel().eventLoop();
            //获得eventLoop对象后进行任务调度,执行客户端再次连接方法
            eventLoop.schedule(() -> {
                sayOnClient.createBootStrap(new Bootstrap(), eventLoop);
            }, 10L, TimeUnit.SECONDS);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        super.channelRead(ctx, msg);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, SocketChannel socketChannel) throws Exception {
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
            switch (idleStateEvent.state()) {
                case WRITER_IDLE:
                    System.out.println("发送心跳----写数据");
                    ctx.channel().writeAndFlush("111");
                    break;
                case READER_IDLE:
                    System.out.println("读超时");
//                    ctx.channel().close();
                    break;
                default:
                    ctx.channel().close();
            }
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("发生异常:");
        cause.printStackTrace();
        ctx.channel().close();
    }
}

自定义的listener

package cn.zxw.netty.work;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoop;

import java.util.concurrent.TimeUnit;

/**
 * @author zxw
 * @version 1.0
 * @description 监听器
 * @data: 2020/3/7 20:33
 */
public class SayOnListener implements ChannelFutureListener {

    private SayOnClient sayOnClient;

    public SayOnListener(SayOnClient sayOnClient){
        this.sayOnClient = sayOnClient;
    }

    @Override
    public void operationComplete(ChannelFuture channelFuture) throws Exception {
        if (!channelFuture.isSuccess()){
           final EventLoop eventLoop = channelFuture.channel().eventLoop();
           //如果连接失败,进行重连的任务调度
           eventLoop.schedule(()->{
               sayOnClient.createBootStrap(new Bootstrap(),eventLoop);
           },10L, TimeUnit.SECONDS);
            System.out.println("开始重连!!");
        } else {
            System.out.println("连接成功!!");
        }
    }
}

编写controller进行测试


/**
 * @author zxw
 * @version 1.0
 * @description bootstart
 * @data: 2020/2/28 11:10
 */
@RestController
@Api(tags = "TestController", description = "测试controller")
public class TestController {

    @ApiOperation("测试netty程序")
    @RequestMapping(value = "/nettyTest", method = RequestMethod.POST)
    public String nettyTest() {

        SayOnClient.channel.writeAndFlush(Unpooled.copiedBuffer("bytes".getBytes()));
        return "success";
    }

使用tcp服务端工具起一个服务器进行测试,发送数据后如下

SpringBoot集成netty在项目中的应用


此方法是自己第一次接触netty后在项目中的应用,如果有更好更规范的方式欢迎大家指点(因为客户端没有进行优雅关闭)