欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Bootstrap初始化过程源码分析--netty客户端的启动

程序员文章站 2022-10-08 20:50:35
Bootstrap初始化过程 netty的客户端引导类是Bootstrap,我们看一下spark的rpc中客户端部分对Bootstrap的初始化过程 TransportClientFactory.createClient(InetSocketAddress address) 只需要贴出Bootstr ......

bootstrap初始化过程

netty的客户端引导类是bootstrap,我们看一下spark的rpc中客户端部分对bootstrap的初始化过程

transportclientfactory.createclient(inetsocketaddress address)

只需要贴出bootstrap初始化部分的代码

// 客户端引导对象
bootstrap bootstrap = new bootstrap();
// 设置各种参数
bootstrap.group(workergroup)
  .channel(socketchannelclass)
  // disable nagle's algorithm since we don't want packets to wait
  // 关闭nagle算法
  .option(channeloption.tcp_nodelay, true)
  .option(channeloption.so_keepalive, true)
  .option(channeloption.connect_timeout_millis, conf.connectiontimeoutms())
  .option(channeloption.allocator, pooledallocator);

// socket接收缓冲区
if (conf.receivebuf() > 0) {
  bootstrap.option(channeloption.so_rcvbuf, conf.receivebuf());
}

// socket发送缓冲区
// 对于接收和发送缓冲区的设置应该用如下的公式计算:
// 延迟 *带宽
// 例如延迟是1ms,带宽是10gbps,那么缓冲区大小应该设为1.25mb
if (conf.sendbuf() > 0) {
  bootstrap.option(channeloption.so_sndbuf, conf.sendbuf());
}

final atomicreference<transportclient> clientref = new atomicreference<>();
final atomicreference<channel> channelref = new atomicreference<>();

// 设置handler(处理器对象)
bootstrap.handler(new channelinitializer<socketchannel>() {
  @override
  public void initchannel(socketchannel ch) {
    transportchannelhandler clienthandler = context.initializepipeline(ch);
    clientref.set(clienthandler.getclient());
    channelref.set(ch);
  }
});

// connect to the remote server
long preconnect = system.nanotime();
// 与服务端建立连接,启动方法
channelfuture cf = bootstrap.connect(address);

分为几个主要的步骤:

  • 首先创建一个bootstrap对象,调用的是无参构造器
  • 设置各种参数,如通道类型,关闭nagle算法,接收和发送缓冲区大小,设置处理器
  • 调用connect与服务端建立连接

接下来,我们主要通过两条线索来分析bootstrap的启动过程,即构造器和connect两个方法,而对于设置参数的过程仅仅是给内部的一些成员变量赋值,所以不需要详细展开。

bootstrap.bootstrap()

bootstrap继承了abstractbootstrap,看了一下他们的无参构造方法,都是个空方法。。。。。。所以这一步,我们就省了,瞬间感觉飞起来了有没有^_^

bootstrap.connect(socketaddress remoteaddress)

public channelfuture connect(socketaddress remoteaddress) {
    // 检查非空
    objectutil.checknotnull(remoteaddress, "remoteaddress");
    // 同样是对一些成员变量检查非空,主要检查eventloopgroup,channelfactory,handler对象
    validate();
    return doresolveandconnect(remoteaddress, config.localaddress());
}

主要是做了一些非空检查,需要注意的是,channelfactory对象的设置,前面的spark中在对bootstrap初始化设置的时候调用了.channel(socketchannelclass)方法,这个方法如下:

public b channel(class<? extends c> channelclass) {
    return channelfactory(new reflectivechannelfactory<c>(
            objectutil.checknotnull(channelclass, "channelclass")
    ));
}

创建了一个reflectivechannelfactory对象,并赋值给内部的channelfactory成员。这个工厂类会根据传进来的class对象通过反射创建一个channel实例。

doresolveandconnect

从这个方法的逻辑中可以看出来,创建一个连接的过程分为两个主要的步骤;

  • 初始化一个channel对象并注册到eventloop中
  • 调用doresolveandconnect0方法完成tcp连接的建立

值得注意的是,initandregister方法返回一个future对象,这个类型通常用于异步机制的实现。在这里,如果注册没有立即成功的话,会给返回的futrue对象添加一个监听器,在注册成功以后建立tcp连接。

private channelfuture doresolveandconnect(final socketaddress remoteaddress, final socketaddress localaddress) {
    // 初始化一个channel对象并注册到eventloop中
    final channelfuture regfuture = initandregister();
    final channel channel = regfuture.channel();

    if (regfuture.isdone()) {
        // 如果注册失败,世界返回失败的future对象
        if (!regfuture.issuccess()) {
            return regfuture;
        }
        return doresolveandconnect0(channel, remoteaddress, localaddress, channel.newpromise());
    } else {// 如果注册还在进行中,需要向future对象添加一个监听器,以便在注册成功的时候做一些工作,监听器实际上就是一个回调对象
        // registration future is almost always fulfilled already, but just in case it's not.
        final pendingregistrationpromise promise = new pendingregistrationpromise(channel);
        regfuture.addlistener(new channelfuturelistener() {
            @override
            public void operationcomplete(channelfuture future) throws exception {
                // directly obtain the cause and do a null check so we only need one volatile read in case of a
                // failure.
                throwable cause = future.cause();
                if (cause != null) {
                    // registration on the eventloop failed so fail the channelpromise directly to not cause an
                    // illegalstateexception once we try to access the eventloop of the channel.
                    promise.setfailure(cause);
                } else {
                    // registration was successful, so set the correct executor to use.
                    // see https://github.com/netty/netty/issues/2586
                    promise.registered();
                    // 注册成功后仍然调用doresolveandconnect0方法完成连接建立的过程
                    doresolveandconnect0(channel, remoteaddress, localaddress, promise);
                }
            }
        });
        return promise;
    }

initandregister

仍然分为两个步骤:

  • 通过channel工厂类创建一个channel对象,通过反射获取指定的channel类型的无参构造器,调用构造器来创建对象
  • 调用init方法对channel对象进行初始化,init方法是一个抽象方法,bootstrap和serverbootstrap的实现不同
  • 将channel注册到eventloopgroup中

注意看源码中的一段注释,这段注释对netty的线程模型的理解很有帮助,大致意思是说:

  • 如果当前的代码是在eventloopevent线程中执行的,那么代码运行到这里说明channel已经成功注册到eventloopevent上了,此时再调用bind() 或 connect()方法肯定是没有问题的
  • 如果当前代码不是在eventloopevent线程中执行的,也就是说当前线程是另外的线程,在这里继续调用bind() 或 connect()方法仍然是安全的,并不会由于并发引起方法执行顺序的错乱,原因是netty中一个channel只会绑定到一个线程上,所有关于这个channel的操作包括注册,bind或connect都会以排队任务的形式在一个线程中串行执行,这种做法也为netty规避了很多线程安全问题,从而减少了很多加锁,同步的代码,减少了线程之间的竞争资源导致的线程切换,侧面上提高了线程执行效率。

final channelfuture initandregister() {
channel channel = null;
try {
// 通过channel工厂类创建一个channel对象
channel = channelfactory.newchannel();
// 调用init方法对channel进行一些初始化的设置
init(channel);
} catch (throwable t) {
if (channel != null) {
// channel can be null if newchannel crashed (eg socketexception("too many open files"))
channel.unsafe().closeforcibly();
// as the channel is not registered yet we need to force the usage of the globaleventexecutor
return new defaultchannelpromise(channel, globaleventexecutor.instance).setfailure(t);
}
// as the channel is not registered yet we need to force the usage of the globaleventexecutor
return new defaultchannelpromise(new failedchannel(), globaleventexecutor.instance).setfailure(t);
}

    // 注册到eventloopgroup中
    channelfuture regfuture = config().group().register(channel);
    // 如果发生异常,需要关闭已经建立的连接
    if (regfuture.cause() != null) {
        if (channel.isregistered()) {
            channel.close();
        } else {
            channel.unsafe().closeforcibly();
        }
    }

    // if we are here and the promise is not failed, it's one of the following cases:
    // 1) if we attempted registration from the event loop, the registration has been completed at this point.
    //    i.e. it's safe to attempt bind() or connect() now because the channel has been registered.
    // 2) if we attempted registration from the other thread, the registration request has been successfully
    //    added to the event loop's task queue for later execution.
    //    i.e. it's safe to attempt bind() or connect() now:
    //         because bind() or connect() will be executed *after* the scheduled registration task is executed
    //         because register(), bind(), and connect() are all bound to the same thread.
    
    return regfuture;
}

niosocketchannel初始化

default_selector_provider是默认的selectorprovider对象,这时jdk中定义的一个类,主要作用是生成选择器selector对象和通道channel对象

public niosocketchannel() {
    this(default_selector_provider);
}

newsocket中通过调用provider.opensocketchannel()方法创建了一个socketchannel对象,它的默认实现是socketchannelimpl。
public niosocketchannel(selectorprovider provider) {
this(newsocket(provider));
}

然后经过几次调用,最后调用了下面的构造器,首先调用了父类abstractniobytechannel的构造器,
然后创建了一个socketchannelconfig对象,这个类有点类似于门面模式,对niosocketchannel对象和socket对象的一些参数设置和获取的接口进行封装。
public niosocketchannel(channel parent, socketchannel socket) {
super(parent, socket);
config = new niosocketchannelconfig(this, socket.socket());
}

我们在接着看父类abstractniobytechannel的构造方法

abstractniobytechannel(channel parent, selectablechannel ch)

没有做任何工作,直接调用了父类的构造方法,注意这里多了一个参数selectionkey.op_read,这个参数表示channel初始时的感兴趣的事件,channel刚创建好之后对read事件感兴趣
protected abstractniobytechannel(channel parent, selectablechannel ch) {
super(parent, ch, selectionkey.op_read);
}

abstractniochannel(channel parent, selectablechannel ch, int readinterestop)

主要还是调用父类的构造方法

protected abstractniochannel(channel parent, selectablechannel ch, int readinterestop) {
    // 父类构造方法
    super(parent);
    this.ch = ch;
    this.readinterestop = readinterestop;
    try {
        // 设置非阻塞
        ch.configureblocking(false);
    } catch (ioexception e) {
        try {
            // 如果发生异常,关闭该channel
            ch.close();
        } catch (ioexception e2) {
            if (logger.iswarnenabled()) {
                logger.warn(
                        "failed to close a partially initialized socket.", e2);
            }
        }

        throw new channelexception("failed to enter non-blocking mode.", e);
    }
}

abstractchannel(channel parent)

最关键的初始化逻辑在这个最顶层的基类中,其中很重的两个对象unsafe对象和channelpipeline对象,前者封装了jdk底层api的调用,后者是实现netty对事件的链式处理的核心类。

protected abstractchannel(channel parent) {
    this.parent = parent;
    // 创建一个channelid对象,唯一标识该channel
    id = newid();
    // unsafe对象,封装了jdk底层的api调用
    unsafe = newunsafe();
    // 创建一个defaultchannelpipeline对象
    pipeline = newchannelpipeline();
}

小结

前面一小节,我们主要简单分析了一下niosocketchannel的初始化过程,可以看到最主要的逻辑在abstractchannel的构造方法中,这里我们看到了两个重要的类的创建过程。

bootstrap.init

回到abstractbootstrap.initandregister方法中,在完成通过反射调用niosocketchannel构造方法并创建一个实例后,紧接着就要对这个新创建的channel实例进行初始化设置工作,我们看一下bootstrap对新创建的channel的初始化过程:

  • 向channel的pipeline中添加一个处理器,channelpipeline我们可以理解为一个流水线,在这条流水线上有各种各样的处理器,一个channel事件产生后会在这个流水线上进行传播,依次经过所有的处理器
  • 设置参数,也就是以channeloption为key的一些参数,可以通过defaultchannelconfig.setoption方法看到具体可以设置哪些参数。
  • 设置属性

    void init(channel channel) throws exception {
    channelpipeline p = channel.pipeline();
    // 向channelpipeline中添加一个处理器,这个处理器就是我们之前设置的处理器
    p.addlast(config.handler());

      final map<channeloption<?>, object> options = options0();
      // 设置参数,最终是通过调用socketchannelconfig的一些参数设置接口设置参数
      synchronized (options) {
          setchanneloptions(channel, options, logger);
      }
    
      final map<attributekey<?>, object> attrs = attrs0();
      // 设置属性
      synchronized (attrs) {
          for (entry<attributekey<?>, object> e: attrs.entryset()) {
              channel.attr((attributekey<object>) e.getkey()).set(e.getvalue());
          }
      }

    }

multithreadeventloopgroup.register

在完成channel的创建和初始化之后,我们就要将这个channel注册到一个eventloop中,nionioeventloop继承自multithreadeventloopgroup, 通过调用singlethreadeventloop的register方法完成注册

public channelfuture register(channel channel) {
    return next().register(channel);
}

可以看到,通过next()方法选出了其中的一个eventloop进行注册。multithreadeventloopgroup是对多个真正的eventloopgroup的封装,每个实现了实际功能的真正的eventloopgroup运行在一个线程内,
所以我们接下来应该看单个的eventloopgroup的注册方法。

singlethreadeventloop.register

这里创建了一个defaultchannelpromise对象,用于作为返回值。

public channelfuture register(channel channel) {
    return register(new defaultchannelpromise(channel, this));
}

最终调用了unsafe的register方法将channel绑定到当前的eventloopgroup对象上。
public channelfuture register(final channelpromise promise) {
objectutil.checknotnull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}

abstractchannel.abstractunsafe.register

  • 首先是做一些前置检查,包括变量非空检查,重复注册检查,检查channel类型和eventloopgroup类型是否匹配
  • 将这个channel绑定到指定的eventloop对象上,
  • 调用register0完成注册

      public final void register(eventloop eventloop, final channelpromise promise) {
          // 做一些非空检查
          if (eventloop == null) {
              throw new nullpointerexception("eventloop");
          }
          // 如果重复注册,通过future对象抛出一个异常
          // 一个channel只能注册到一个eventloopgroup对象上
          if (isregistered()) {
              promise.setfailure(new illegalstateexception("registered to an event loop already"));
              return;
          }
          // 检查channel类型和eventloopgroup类型是否匹配
          if (!iscompatible(eventloop)) {
              promise.setfailure(
                      new illegalstateexception("incompatible event loop type: " + eventloop.getclass().getname()));
              return;
          }
    
          // 将channel内部的eventloop成员设置为相应的对象
          // 也就是将这个channel绑定到指定顶eventloop上
          abstractchannel.this.eventloop = eventloop;
    
          // 这里做了一个判断,如果当前处于eventloop对应的线程内,那么直接执行代码
          // 如果当前运行的线程与eventloop不是同一个,那么将这个注册的任务添加到eventloop的任务队列中
          if (eventloop.ineventloop()) {
              register0(promise);
          } else {
              try {
                  eventloop.execute(new runnable() {
                      @override
                      public void run() {
                          register0(promise);
                      }
                  });
              } catch (throwable t) {
                  logger.warn(
                          "force-closing a channel whose registration task was not accepted by an event loop: {}",
                          abstractchannel.this, t);
                  closeforcibly();
                  closefuture.setclosed();
                  safesetfailure(promise, t);
              }
          }
      }

abstractchannel.abstractunsafe.register0

这个方法实现了实际的注册逻辑,

  • 依然要做一些前置的设置和检查工作,包括在注册过程中不可取消,检查channel是否存活,
  • 调用jdk的api完成注册。例如,对于jdk nio的通道的注册就是调用selectablechannel.register(selector sel, int ops, object att)
  • 调用所有的已添加的处理器节点的channelhandler.handleradded方法,实际上这也会调用handler.handlerremoved方法,如果在此之前有handler被移除掉的话
  • 通知future对象已经注册成功了
  • 触发一个channel注册成功的事件,这个事件会在pipeline中传播,所有注册的handler会依次接收到该事件并作出相应的处理
  • 如果是第一次注册,还需要触发一个channel存活的事件,让所有的handler作出相应的处理

      private void register0(channelpromise promise) {
          try {
              // check if the channel is still open as it could be closed in the mean time when the register
              // call was outside of the eventloop
              // 将channelpromise设置为不可取消,并检查channel是否还存活,通过内部的jdk的channel检查是否存活
              if (!promise.setuncancellable() || !ensureopen(promise)) {
                  return;
              }
              // 是否第一次注册,
              // todo 说明情况下会注册多次??
              boolean firstregistration = neverregistered;
              // 完成实际的注册,即底层api的调用
              // 如果对于jdk nio的通道的注册就是调用selectablechannel.register(selector sel, int ops, object att)
              doregister();
              // 更新标志变量
              neverregistered = false;
              registered = true;
    
              // ensure we call handleradded(...) before we actually notify the promise. this is needed as the
              // user may already fire events through the pipeline in the channelfuturelistener.
              // 调用所有的已添加的处理器节点的channelhandler.handleradded方法
              pipeline.invokehandleraddedifneeded();
    
              // 通过future对象已经注册成功了
              safesetsuccess(promise);
              // 触发一个channel注册成功的事件,这个事件会在pipeline中传播,
              // 所有注册的handler会依次接收到该事件并作出相应的处理
              pipeline.firechannelregistered();
              // only fire a channelactive if the channel has never been registered. this prevents firing
              // multiple channel actives if the channel is deregistered and re-registered.
              if (isactive()) {
                  if (firstregistration) {
                      // 如果是第一次注册,还需要触发一个channel存活的事件,让所有的handler作出相应的处理
                      pipeline.firechannelactive();
                  } else if (config().isautoread()) {
                      // this channel was registered before and autoread() is set. this means we need to begin read
                      // again so that we process inbound data.
                      //
                      // see https://github.com/netty/netty/issues/4805
                      // 开始接收读事件
                      // 对于nio类型的channel, 通过调用jdk的相关api注册读事件为感兴趣的事件
                      beginread();
                  }
              }
          } catch (throwable t) {
              // close the channel directly to avoid fd leak.
              closeforcibly();
              closefuture.setclosed();
              safesetfailure(promise, t);
          }
      }

小结

到此,我们就完成了对channel的创建,初始化,和注册到eventloop过程的分析,整个过程看下来,其实并不复杂,只不过代码的嵌套比较深,继承结构复杂,有些简单的功能可能要看好几层才能找到真正实现的地方,所以还需要耐心和熟悉。这里,我把主干逻辑再提炼一下,去掉所有细枝末节的逻辑,一遍能有一个整体的认识:

  • 首先通过反射创建了一个niosocketchannel(通过反射调用无参构造器)
  • 然后对channel对象进行初始化,主要是想这个channel的channelpipeline中添加用户设置的handler
  • 最后将这个channel注册到一个eventloop上,注册过程设计jdk底层的selector注册api的调用,调用handler的回调方法,在channelpipeline中触发一个channel注册的事件,这些事件最终回调各个handler对象的channelregistered方法。

接下来,我们回到bootstrap.doresolveandconnect方法中,继续完成建立连接的过程的分析。

bootstrap.doresolveandconnect0

连接的建立在方法doresolveandconnect0中实现:

这个方法的主要工作就是对远程地址进行解析,比如通过dns服务器对域名进行解析,
然后使用解析后的地址进行连接的建立,连接建立调用doconnect方法

private channelfuture doresolveandconnect0(final channel channel, socketaddress remoteaddress,
                                           final socketaddress localaddress, final channelpromise promise) {
    try {
        final eventloop eventloop = channel.eventloop();
        // 获取一个地址解析器
        final addressresolver<socketaddress> resolver = this.resolver.getresolver(eventloop);

        // 如果解析器不支持该地址或者改地址已经被解析过了,那么直接开始创建连接
        if (!resolver.issupported(remoteaddress) || resolver.isresolved(remoteaddress)) {
            // resolver has no idea about what to do with the specified remote address or it's resolved already.
            doconnect(remoteaddress, localaddress, promise);
            return promise;
        }

        // 对远程地址进行解析
        final future<socketaddress> resolvefuture = resolver.resolve(remoteaddress);

        if (resolvefuture.isdone()) {
            final throwable resolvefailurecause = resolvefuture.cause();

            if (resolvefailurecause != null) {
                // failed to resolve immediately
                channel.close();
                promise.setfailure(resolvefailurecause);
            } else {
                // succeeded to resolve immediately; cached? (or did a blocking lookup)
                // 解析成功后进行连接
                doconnect(resolvefuture.getnow(), localaddress, promise);
            }
            return promise;
        }

        // wait until the name resolution is finished.
        // 给future对象添加一个回调,采用异步方法进行连接,
        resolvefuture.addlistener(new futurelistener<socketaddress>() {
            @override
            public void operationcomplete(future<socketaddress> future) throws exception {
                if (future.cause() != null) {
                    channel.close();
                    promise.setfailure(future.cause());
                } else {
                    doconnect(future.getnow(), localaddress, promise);
                }
            }
        });
    } catch (throwable cause) {
        promise.tryfailure(cause);
    }
    return promise;
}

bootstrap.doconnect

调用channel的connect方法完成连接过程。
也许是之前看scala代码习惯了,回过头来看java代码感觉很冗余,一大堆代码就表达了那一点逻辑,感觉信息密度太低,现在有很多人认为java会渐渐的没落,而最优可能取代java的语言中,scala绝对是强有力的竞争者之一,没有对比就没有伤害,跟java比,scala语言真的是简洁太多了,几句话就能把所要表达的逻辑精准而又直接地表达出来。好像向声明式编程更靠近了一点。

private static void doconnect(
        final socketaddress remoteaddress, final socketaddress localaddress, final channelpromise connectpromise) {

    // this method is invoked before channelregistered() is triggered.  give user handlers a chance to set up
    // the pipeline in its channelregistered() implementation.
    final channel channel = connectpromise.channel();
    channel.eventloop().execute(new runnable() {
        @override
        public void run() {
            if (localaddress == null) {
                // 调用 channel.connect方法完成连接
                channel.connect(remoteaddress, connectpromise);
            } else {
                channel.connect(remoteaddress, localaddress, connectpromise);
            }
            connectpromise.addlistener(channelfuturelistener.close_on_failure);
        }
    });
}

abstractchannel.connect

public channelfuture connect(socketaddress remoteaddress, channelpromise promise) {
    return pipeline.connect(remoteaddress, promise);
}

defaultchannelpipeline.connect

这里稍微说明一下,tail是整个链条的尾节点,如果对netty比较熟悉的话,应该知道netty对于io事件的处理采用责任链的模式,即用户可以设置多个处理器,这些处理器组成一个链条,io事件在这个链条上传播,被特定的一些处理器所处理,而其中有两个特殊的处理器head和tail,他们分别是这个链条的头和尾,他们的存在主要是为了实现一些特殊的逻辑。

public final channelfuture connect(socketaddress remoteaddress, channelpromise promise) {
    return tail.connect(remoteaddress, promise);
}

abstractchannelhandlercontext.connect

中间经过几个调用之后,最终调用该方法。这里有一句关键代码findcontextoutbound(mask_connect),这个方法的代码我就不贴了,大概说一下它的作用,更为具体的机制等后面分析channelpipeline是在详细说明。这个方法会在处理器链中从后向前遍历,直到找到能够处理connect事件的处理器,能否处理某种类型的事件是通过比特位判断的,每个abstractchannelhandlercontext对象内部有一个int型变量用于存储标志各种类型事件的比特位。一般,connect事件会有头结点head来处理,也就是defaultchannelpipeline.headcontext类,所以我们直接看defaultchannelpipeline.headcontext.connect方法

public channelfuture connect(
        final socketaddress remoteaddress, final socketaddress localaddress, final channelpromise promise) {

    if (remoteaddress == null) {
        throw new nullpointerexception("remoteaddress");
    }
    if (isnotvalidpromise(promise, false)) {
        // cancelled
        return promise;
    }

    // 找到下一个能够进行connect操作的,这里用比特位来标记各种不同类型的操作,
    final abstractchannelhandlercontext next = findcontextoutbound(mask_connect);
    eventexecutor executor = next.executor();
    if (executor.ineventloop()) {
        // 调用abstractchannelhandlercontext.invokeconnect
        next.invokeconnect(remoteaddress, localaddress, promise);
    } else {
        safeexecute(executor, new runnable() {
            @override
            public void run() {
                next.invokeconnect(remoteaddress, localaddress, promise);
            }
        }, promise, null);
    }
    return promise;
}

defaultchannelpipeline.headcontext.connect

public void connect(
            channelhandlercontext ctx,
            socketaddress remoteaddress, socketaddress localaddress,
            channelpromise promise) {
        unsafe.connect(remoteaddress, localaddress, promise);
    }

unsafe对象的赋值:

    headcontext(defaultchannelpipeline pipeline) {
        super(pipeline, null, head_name, headcontext.class);
        unsafe = pipeline.channel().unsafe();
        setaddcomplete();
    }

所以我们直接看unsafe.connect

abstractniochannel.connect

主要逻辑:

  • 状态检查,非空检查
  • 调用doconnect方法进行连接
  • 如果立即就连接成功了,那么将future对象设置为成功
  • 如果超时大于0,会提交一个延迟调度的任务,在超时时间到达后执行这个任务检查是否连接成功,如果为连接成功连接说明连接超时,需要关闭通道
  • 向future对象添加一个回调,在future被外部调用者取消时将通道关闭

可见建立连接的核心方法是doconnect,这是一个抽象方法,我们看niosocketchannel,也就是tcp连接的建立过程,查看abstractniochannel的实现类发现还有udp,sctp等协议

public final void connect(
            final socketaddress remoteaddress, final socketaddress localaddress, final channelpromise promise) {
        // 检查promise状态,channel存活状态
        if (!promise.setuncancellable() || !ensureopen(promise)) {
            return;
        }

        try {
            // 防止重复连接
            if (connectpromise != null) {
                // already a connect in process.
                throw new connectionpendingexception();
            }

            boolean wasactive = isactive();
            // 调用doconnect方法进行连接
            if (doconnect(remoteaddress, localaddress)) {
                // 如果立即就连接成功了,那么将future对象设置为成功
                fulfillconnectpromise(promise, wasactive);
            } else {
                connectpromise = promise;
                requestedremoteaddress = remoteaddress;

                // schedule connect timeout.
                int connecttimeoutmillis = config().getconnecttimeoutmillis();
                // 如果超时大于0,那么会在超时到达后检查是否连接成功
                if (connecttimeoutmillis > 0) {
                    connecttimeoutfuture = eventloop().schedule(new runnable() {
                        @override
                        public void run() {
                            channelpromise connectpromise = abstractniochannel.this.connectpromise;
                            connecttimeoutexception cause =
                                    new connecttimeoutexception("connection timed out: " + remoteaddress);
                            // 如果connectpromise能够标记为失败,说明此时还没有连接成功,也就是连接超时了
                            // 此时需要关闭该通道
                            if (connectpromise != null && connectpromise.tryfailure(cause)) {
                                close(voidpromise());
                            }
                        }
                    }, connecttimeoutmillis, timeunit.milliseconds);
                }

                // 向future对象添加一个回调,在future被外部调用者取消时将通道关闭
                promise.addlistener(new channelfuturelistener() {
                    @override
                    public void operationcomplete(channelfuture future) throws exception {
                        if (future.iscancelled()) {
                            if (connecttimeoutfuture != null) {
                                connecttimeoutfuture.cancel(false);
                            }
                            connectpromise = null;
                            close(voidpromise());
                        }
                    }
                });
            }
        } catch (throwable t) {
            promise.tryfailure(annotateconnectexception(t, remoteaddress));
            closeifclosed();
        }
    }

niosocketchannel.doconnect

  • 首先绑定指定的本地地址
  • 调用socketutils.connect建立连接

    protected boolean doconnect(socketaddress remoteaddress, socketaddress localaddress) throws exception {
    // 绑定指定的本地地址
    if (localaddress != null) {
    dobind0(localaddress);
    }

      // 这个变量标记建立连接的动作是否发起成功
      // 成功发起建立连接的工作并不表示连接已经成功建立
      boolean success = false;
      try {
          // 实际建立连接的语句
          boolean connected = socketutils.connect(javachannel(), remoteaddress);
          if (!connected) {
              selectionkey().interestops(selectionkey.op_connect);
          }
          success = true;
          // 返回连接是否已经成功建立
          return connected;
      } finally {
          if (!success) {
              doclose();
          }
      }

    }

socketutils.connect

可以看到,最终是通过调用jdk的api来实现连接的建立,也就是socketchannel.connect方法

public static boolean connect(final socketchannel socketchannel, final socketaddress remoteaddress)
        throws ioexception {
    try {
        return accesscontroller.doprivileged(new privilegedexceptionaction<boolean>() {
            @override
            public boolean run() throws ioexception {
                // 调用jdk api建立连接,socketchannel.connect
                return socketchannel.connect(remoteaddress);
            }
        });
    } catch (privilegedactionexception e) {
        throw (ioexception) e.getcause();
    }
}

总结

一句话,这代码是真的很深! 非常不直接,初次看的话,如果没有一个代码框架图在旁边参考,很容易迷失在层层的继承结构中,很多代码层层调用,真正有用的逻辑隐藏的很深,所以看这中代码必须要有耐心,有毅力,要有打破砂锅问到底的决心。不过这样的复杂的代码结构好处也是显而易见的,那就是良好的扩展性,你可以在任意层级进行扩展。

总结一下建立连接的过程,我认为可以归结为三个主要的方面:

  • 第一, 实际建立逻辑的代码肯定还是jdk api
  • 第二,这么多方法调用,主要的作用就是迎合框架的要求,本质上是为了代码的扩展性,比如channelpipeline的处理器链
  • 第三,另一个主要的工作就是对future对象的处理,这时实现异步的重要手段,future对象也是外部调用者和对象内部状态之间的连接纽带,调用者通过future对象完成一些功能,如查状态,发出取消动作,实现阻塞等待等。