欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

netty之NioEventLoopGroup源码分析二

程序员文章站 2023-02-26 15:52:33
大家好,今天我准备死磕NioEventLoopGroup的源码,首先讲下概念,NioEventLoopGroup 它是一个线程池,存放NioEventLoop,一个数组,今天打算先看下这行代码的初始化 一、初始化 1、时序图: 2.类的关系 3.源码说明: 步骤1: 如上四个方法是NioEventL ......

大家好,今天我准备死磕nioeventloopgroup的源码,首先讲下概念,nioeventloopgroup 它是一个线程池,存放nioeventloop,一个数组,今天打算先看下这行代码的初始化

 eventloopgroup bossgroup = new nioeventloopgroup();

一、初始化

1、时序图:

 

netty之NioEventLoopGroup源码分析二

2.类的关系

netty之NioEventLoopGroup源码分析二

3.源码说明:

 步骤1:

 1 eventloopgroup bossgroup = new nioeventloopgroup(); 
 2 //调用链1   
 3 public nioeventloopgroup() {
 4     this(0);
 5 }
 6 //调用链2
 7 public nioeventloopgroup(int nthreads) {
 8     this(nthreads, (executor) null);
 9 }
10 //调用链3
11 public nioeventloopgroup(int nthreads, executor executor) {
12     this(nthreads, executor, selectorprovider.provider());
13 }
14 //调用链4
15 public nioeventloopgroup(
16             int nthreads, executor executor, final selectorprovider selectorprovider) {
17     super(nthreads, executor, selectorprovider);
18 }

 如上四个方法是nioeventloopgroup类的四个构造函数,首先会执行第3行的无参构造函数,然后调用第7行的构造函数,再继续调用第11行的构造函数,这里需要获取一个selectorprovider,见步骤2,再继续调用第15行的构造函数,这里内部实现调用父类multithreadeventloopgroup的构造函数。

步骤2:

 1 public static selectorprovider provider() {
 2         synchronized (lock) {
 3             if (provider != null)
 4                 return provider;
 5             return accesscontroller.doprivileged(
 6                 new privilegedaction<selectorprovider>() {
 7                     public selectorprovider run() {
 8                             if (loadproviderfromproperty())
 9                                 return provider;
10                             if (loadproviderasservice())
11                                 return provider;
12                             provider = sun.nio.ch.defaultselectorprovider.create();
13                             return provider;
14                         }
15                     });
16         }
17     }

 这里selectorprovider的静态方法provider获取一个selectorprovider,全局唯一的,有加锁,依赖底层系统的实现provider不一样,后续另开专题分析; 

 

步骤3:

1 protected multithreadeventloopgroup(int nthreads, executor executor, object... args) {
2     super(nthreads == 0 ? default_event_loop_threads : nthreads, executor, args);
3 }

如上是multithreadeventloopgroup的构造函数,内部又调用了父类multithreadeventexecutorgroup的构造函数,继续往下看;

步骤4:

 1 protected multithreadeventexecutorgroup(int nthreads, executor executor, object... args) {
 2         if (nthreads <= 0) {
 3             throw new illegalargumentexception(string.format("nthreads: %d (expected: > 0)", nthreads));
 4         }
 5 
 6         if (executor == null) {
 7             executor = new threadpertaskexecutor(newdefaultthreadfactory());
 8         }
 9 
10         children = new eventexecutor[nthreads];
11         for (int i = 0; i < nthreads; i ++) {
12             boolean success = false;
13             try {
14                 children[i] = newchild(executor, args);
15                 success = true;
16             } catch (exception e) {
17                 // todo: think about if this is a good exception type
18                 throw new illegalstateexception("failed to create a child event loop", e);
19             } finally {
20                 if (!success) {
21                     for (int j = 0; j < i; j ++) {
22                         children[j].shutdowngracefully();
23                     }
24 
25                     for (int j = 0; j < i; j ++) {
26                         eventexecutor e = children[j];
27                         try {
28                             while (!e.isterminated()) {
29                                 e.awaittermination(integer.max_value, timeunit.seconds);
30                             }
31                         } catch (interruptedexception interrupted) {
32                             thread.currentthread().interrupt();
33                             break;
34                         }
35                     }
36                 }
37             }
38         }
39 
40         final futurelistener<object> terminationlistener = new futurelistener<object>() {
41             @override
42             public void operationcomplete(future<object> future) throws exception {
43                 if (terminatedchildren.incrementandget() == children.length) {
44                     terminationfuture.setsuccess(null);
45                 }
46             }
47         };
48 
49         for (eventexecutor e: children) {
50             e.terminationfuture().addlistener(terminationlistener);
51         }
52 
53         set<eventexecutor> childrenset = new linkedhashset<eventexecutor>(children.length);
54         collections.addall(childrenset, children);
55         readonlychildren = collections.unmodifiableset(childrenset);
56     }

  如上是核心实现的构造函数,代码比较多,我们分段来看下都做了哪些事? 

为了方便查看,把代码又粘贴了一遍

if (nthreads <= 0) {
    throw new illegalargumentexception(string.format("nthreads: %d (expected: > 0)", nthreads));
}

判断线程数是否小于等于0,否则抛出异常。

 

if (executor == null) {
    executor = new threadpertaskexecutor(newdefaultthreadfactory());
}

第6-8行,会调用子类multithreadeventloopgroup的newdefaultthreadfactory方法生成threadfactory,这里涉及到java继承初始化的知识,不是调用multithreadeventexecutorgroup的newdefaultthreadfactory方法,需要注意一下;

 //调用链1,此方法是multithreadeventloopgroup类的
1 protected threadfactory newdefaultthreadfactory() { 2 return new defaultthreadfactory(getclass(), thread.max_priority); 3 } 4 5 //调用链2 6 public defaultthreadfactory(class<?> pooltype, int priority) { 7 this(pooltype, false, priority); 8 } 9 //调用链3 10 public defaultthreadfactory(class<?> pooltype, boolean daemon, int priority) { 11 this(topoolname(pooltype), daemon, priority); 12 } 13 14 //调用链4 15 private static string topoolname(class<?> pooltype) { 16 if (pooltype == null) { 17 throw new nullpointerexception("pooltype"); 18 } 19 string poolname; 20 package pkg = pooltype.getpackage(); 21 if (pkg != null) { 22 poolname = pooltype.getname().substring(pkg.getname().length() + 1); 23 } else { 24 poolname = pooltype.getname(); 25 } 26 27 switch (poolname.length()) { 28 case 0: 29 return "unknown"; 30 case 1: 31 return poolname.tolowercase(locale.us); 32 default: 33 if (character.isuppercase(poolname.charat(0)) && character.islowercase(poolname.charat(1))) { 34 return character.tolowercase(poolname.charat(0)) + poolname.substring(1); 35 } else { 36 return poolname; 37 } 38 } 39 } 40 41 //调用链5 42 public defaultthreadfactory(string poolname, boolean daemon, int priority) { 43 if (poolname == null) { 44 throw new nullpointerexception("poolname"); 45 } 46 if (priority < thread.min_priority || priority > thread.max_priority) { 47 throw new illegalargumentexception( 48 "priority: " + priority + " (expected: thread.min_priority <= priority <= thread.max_priority)"); 49 } 50 51 prefix = poolname + '-' + poolid.incrementandget() + '-'; 52 this.daemon = daemon; 53 this.priority = priority; 54 }

如上依次从调用链到调用链5,完成defaultthreadfactory初始化;

 

再继续初始化threadpertaskexecutor类

1 public threadpertaskexecutor(threadfactory threadfactory) {
2     if (threadfactory == null) {
3         throw new nullpointerexception("threadfactory");
4     }
5     this.threadfactory = threadfactory;
6 }

这里就是new 一个线程工程; 

 

 1  children = new eventexecutor[nthreads];
 2         for (int i = 0; i < nthreads; i ++) {
 3             boolean success = false;
 4             try {
 5                 children[i] = newchild(executor, args);
 6                 success = true;
 7             } catch (exception e) {
 8                 // todo: think about if this is a good exception type
 9                 throw new illegalstateexception("failed to create a child event loop", e);
10             } finally {
11                 if (!success) {
12                     for (int j = 0; j < i; j ++) {
13                         children[j].shutdowngracefully();
14                     }
15 
16                     for (int j = 0; j < i; j ++) {
17                         eventexecutor e = children[j];
18                         try {
19                             while (!e.isterminated()) {
20                                 e.awaittermination(integer.max_value, timeunit.seconds);
21                             }
22                         } catch (interruptedexception interrupted) {
23                             thread.currentthread().interrupt();
24                             break;
25                         }
26                     }
27                 }
28             }
29         }

上面初始化了8个(默认,可修改)nioeventloop, nioeventloop另外专题分析;

 final futurelistener<object> terminationlistener = new futurelistener<object>() {
            @override
            public void operationcomplete(future<object> future) throws exception {
                if (terminatedchildren.incrementandget() == children.length) {
                    terminationfuture.setsuccess(null);
                }
            }
        };

创建了一个监听器

 

 for (eventexecutor e: children) {
            e.terminationfuture().addlistener(terminationlistener);
        }

这里循环调用增加这个监听器;

 

 set<eventexecutor> childrenset = new linkedhashset<eventexecutor>(children.length);
 collections.addall(childrenset, children);
readonlychildren = collections.unmodifiableset(childrenset);

这里创建一个linkedhashset,在把children中的元素全部加进去,然后调用unmodifiableset,返回一个只读的chidren,readonlychildren;

 

好了,暂时先分析这么多。