SpringBoot集成netty在项目中的应用
程序员文章站
2022-07-12 20:29:36
...
最近做的这个项目,需要和服务端进行tcp通信。大概的需求是这样,服务端是物理硬件,由c++代码控制,后台和机器通过tcp进行通信。当前台输入数据给后台之后,后台作为客户端给服务端发送数据,返回给后台,后台通过websocket将数据不停的发送到前台。下面记录一下后台作为客户端发送给机器的部分。
netty的客户端
package cn.zxw.netty.work;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* @author zxw
* @version 1.0
* @description 连接的客户端
* @data: 2020/3/6 21:33
*/
@Component
public class SayOnClient implements InitializingBean {
private EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
@Value("${netty.host}")
private String host;
@Value("${netty.port}")
private int port;
@Value("${netty.readTime}")
private int readTime;
@Value("${netty.writeTime}")
private int writeTime;
@Value("${netty.allTime}")
private int allTime;
public static Channel channel = null;
@Override
public void afterPropertiesSet() {
createBootStrap(new Bootstrap(), eventLoopGroup);
}
void createBootStrap(Bootstrap bootstrap, EventLoopGroup eventLoopGroup) {
try {
if (bootstrap != null) {
//定义handler 因为在内部类里面无法传入this对象定义到外面传递过去
final SayOnHandler sayOnHandler = new SayOnHandler(this);
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new IdleStateHandler(readTime, writeTime, allTime))
//添加自定义handler
.addLast(sayOnHandler)
//添加解码器
.addLast("sayOnDecoder", new SayOnDecoder());
}
});
ChannelFuture channelFuture = bootstrap.connect(host, port);
channelFuture.addListener(new SayOnListener(this));
//获得channel对象赋值给静态变量
channel = channelFuture.channel();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
自定义的handler
package cn.zxw.netty.work;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.springframework.stereotype.Component;
import java.nio.channels.SocketChannel;
import java.util.concurrent.TimeUnit;
/**
* @author zxw
* @version 1.0
* @description 自定义handler 的区别就是SimpleChannelInboundHandler
* 在接收到数据后会自动release掉数据占用的Bytebuffer资源(自动调用Bytebuffer.release())。
* 而为何服务器端不能用呢,因为我们想让服务器把客户端请求的数据发送回去,
* 而服务器端有可能在channelRead方法返回前还没有写完数据,因此不能让它自动release
* @data: 2020/3/7 20:21
*/
@Component
@ChannelHandler.Sharable
public class SayOnHandler extends SimpleChannelInboundHandler<SocketChannel> {
private SayOnClient sayOnClient;
public SayOnHandler(SayOnClient sayOnClient) {
this.sayOnClient = sayOnClient;
}
/**
* 服务端断开连接会触发,断开后客户端会尝试重连操作
*
* @param ctx
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) {
try {
final EventLoop eventLoop = ctx.channel().eventLoop();
//获得eventLoop对象后进行任务调度,执行客户端再次连接方法
eventLoop.schedule(() -> {
sayOnClient.createBootStrap(new Bootstrap(), eventLoop);
}, 10L, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, SocketChannel socketChannel) throws Exception {
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
switch (idleStateEvent.state()) {
case WRITER_IDLE:
System.out.println("发送心跳----写数据");
ctx.channel().writeAndFlush("111");
break;
case READER_IDLE:
System.out.println("读超时");
// ctx.channel().close();
break;
default:
ctx.channel().close();
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("发生异常:");
cause.printStackTrace();
ctx.channel().close();
}
}
自定义的listener
package cn.zxw.netty.work;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoop;
import java.util.concurrent.TimeUnit;
/**
* @author zxw
* @version 1.0
* @description 监听器
* @data: 2020/3/7 20:33
*/
public class SayOnListener implements ChannelFutureListener {
private SayOnClient sayOnClient;
public SayOnListener(SayOnClient sayOnClient){
this.sayOnClient = sayOnClient;
}
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (!channelFuture.isSuccess()){
final EventLoop eventLoop = channelFuture.channel().eventLoop();
//如果连接失败,进行重连的任务调度
eventLoop.schedule(()->{
sayOnClient.createBootStrap(new Bootstrap(),eventLoop);
},10L, TimeUnit.SECONDS);
System.out.println("开始重连!!");
} else {
System.out.println("连接成功!!");
}
}
}
编写controller进行测试
/**
* @author zxw
* @version 1.0
* @description bootstart
* @data: 2020/2/28 11:10
*/
@RestController
@Api(tags = "TestController", description = "测试controller")
public class TestController {
@ApiOperation("测试netty程序")
@RequestMapping(value = "/nettyTest", method = RequestMethod.POST)
public String nettyTest() {
SayOnClient.channel.writeAndFlush(Unpooled.copiedBuffer("bytes".getBytes()));
return "success";
}
使用tcp服务端工具起一个服务器进行测试,发送数据后如下
此方法是自己第一次接触netty后在项目中的应用,如果有更好更规范的方式欢迎大家指点(因为客户端没有进行优雅关闭)
上一篇: 在Java8的foreach()中使用return
下一篇: Spring Boot 入门指南