欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Netty框架问题记录1--多线程下批量发送消息导致消息被覆盖

程序员文章站 2023-02-07 17:11:23
业务背景 项目是基于Netty实现的实时课堂项目,课堂中老师需要对试卷进行讲解,则老师向服务器发送一个打开试卷信息的请求,服务器获取试卷信息,将试卷信息发送给所有的客户端(学生和老师)。 发送给学生的时候需要在试卷信息中加上本人得分的信息。 实现方式大致如下: 结果:学生A收到的得分是学生B的得分, ......

业务背景

项目是基于netty实现的实时课堂项目,课堂中老师需要对试卷进行讲解,则老师向服务器发送一个打开试卷信息的请求,服务器获取试卷信息,将试卷信息发送给所有的客户端(学生和老师)。

发送给学生的时候需要在试卷信息中加上本人得分的信息。

 实现方式大致如下:

1 paper paper = getpaper(paperid); // 根据试卷id获取试卷详细信息
2 for(client client : allclients){
3    paper.setmyscore(getmyscore(client.getuserid())); //根据userid获取本人得分
4   client.send(paper); //向客户端发送数据
5 }

 

结果:学生a收到的得分是学生b的得分,也就是发送给clienta的paper数据被发送给clientb的paper数据给覆盖了,因为paper对象是同一个

 

原因分析:

虽然发送给所有客户端的信息都是paper对象,但是是在for循环里面执行的send方法,也就是说理论上应该是clienta的send方法执行完了之后才会执行clientb的send方法,也就是说理论上应该是学生a收到的paper信息之后学生b才会收到paper信息。

所以得出的结论猜想就是send方法不是同步执行的,而是异步的。追踪代码进行分析

第四行的代码client.send(paper) 实际就是调用了channel的writeandflush方法

追踪到abstractchannel的实现如下: 

1 @override
2     public channelfuture writeandflush(object msg) {
3         return pipeline.writeandflush(msg);
4     }

 

 执行了channelpipeline的writeandflush方法,跟踪实现类defaultchannelpipeline的实现如下:

1 @override
2     public final channelfuture writeandflush(object msg) {
3         return tail.writeandflush(msg);
4     }

 

执行的是channelhandlercontext的writeandflush方法,跟踪实现类abstractchannelhandlercontext实现如下:

1 @override
2     public channelfuture writeandflush(object msg) {
3         return writeandflush(msg, newpromise());
4     }

 

执行了内部的writeandflush方法,继续跟踪如下:

 1 @override
 2     public channelfuture writeandflush(object msg, channelpromise promise) {
 3         if (msg == null) {
 4             throw new nullpointerexception("msg");
 5         }
 6 
 7         if (isnotvalidpromise(promise, true)) {
 8             referencecountutil.release(msg);
 9             // cancelled
10             return promise;
11         }
12 
13         write(msg, true, promise);
14 
15         return promise;
16     }

 

write方法如下:

 1 private void write(object msg, boolean flush, channelpromise promise) {
 2         abstractchannelhandlercontext next = findcontextoutbound();
 3         final object m = pipeline.touch(msg, next);
 4         eventexecutor executor = next.executor();
 5         if (executor.ineventloop()) { //判断当前线程是否是eventloop线程
 6             if (flush) {
 7                 next.invokewriteandflush(m, promise);
 8             } else {
 9                 next.invokewrite(m, promise);
10             }
11         } else {
12             abstractwritetask task;
13             if (flush) {
14                 task = writeandflushtask.newinstance(next, m, promise);
15             }  else {
16                 task = writetask.newinstance(next, m, promise);
17             }
18             safeexecute(executor, task, promise, m);
19         }
20     }

 

跟踪到这里终于有所发现了,方法逻辑大致如下:

1.获取channelpipeline中的head节点

2.获取当前channel的eventloop对象

3.判断当前channel的eventloop对象中的线程是否是当前线程

4.如果是eventloop线程,则直接执行writeandflush方法,也就是执行写入并且刷新到channelsocket中去

5.如果不是eventloop线程,则会创建一个abstractwritetask,然后将这个task添加到这个channel的eventloop中去 

 

分析到这里就可以总结问题的所在了,如果执行channel的writeandflush的线程不是work线程池中的线程,那么就会先将这个发送消息封装成一个task,然后添加到这个channel所属的eventloop中的阻塞队列中去,

然后通过eventloop的循环来从队列中获取任务来执行。一旦task添加到队列中完成,write方法就会返回。那么当下一个客户端再执行write方法时,由于msg内容是同一个对象,就会将前一个msg的内容给覆盖了。

从而就会出现发送给多个客户端的内容不同,但是接收到的内容是相同的内容。而本例中,执行channel的write方法的线程确实不是eventloop线程,因为我们采用了线程池来处理业务,当channel发送数据给服务器之后,

服务器解析channel中发送来的请求,然后执行业务处理,而执行业务的操作是采用线程池的方式实现的,所以最终通过channel发送数据给客户端的时候实际的线程是线程池中的线程,而并不是channel所属的eventloop中的线程。

 

总结:

netty中的work线程池中的eventloop并不是一个纯粹的io线程,除了有selector轮询io操作之外,还会处理系统的task和定时任务。

系统的task是通过eventloop的execute(runnable task)方法实现,eventloop内部有一个linkedblockingqueue阻塞队列保存task,task一般都是由于用户线程发起的io操作。

每个客户端有一个channel,每一个channel会绑定一个eventloop,所以每个channel的所以io操作默认都是由这个eventloop中的线程来执行。然后用户可以在自定义的线程中执行channel的方法。

当用户线程执行channel的io操作时,并不会立即执行,而是将io操作封装成一个task,然后添加到这个channel对应的eventloop的队列中,然后由这个eventloop中的线程来执行。所以channel的所有io操作最终还是

由同一个eventloop中的线程来执行的,只是发起channel的io操作的线程可以不是任何线程。

 

采用将io操作封装成task的原因主要是防止并发操作导致的锁竞争,因为如果不用task的方式,那么用户线程和io线程就可以同时操作网络资源,就存储并发问题,所以采用task的方式实现了局部的无锁化。

所以线程池固然好用,netty固然强大,但是如果没有深入理解,稍有不慎就可能会出现意想不到的bug。