欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

spark学习-74-源代码:Endpoint模型介绍(6)-Endpoint的消息的接收

程序员文章站 2022-07-15 10:09:02
...

Endpoint receive流程

Endpoint的消息的接收,流程如下:
spark学习-74-源代码:Endpoint模型介绍(6)-Endpoint的消息的接收
本图主要包括两部分,一部分是创建Netty服务
一部分是接收消息

创建Netty服务

这里有个小问题,因为我找不到RpcEndpoint new NettyRpcEnv,导致我两天没找到第一个new

1. 下面以master讲解,先看看main方法

 def main(argStrings: Array[String]) {
    //实用的功能主要是设置一些平常的诊断状态,,应该在main方法之前调用
    Utils.initDaemon(log)
    val conf = new SparkConf
    val args = new MasterArguments(argStrings, conf)
    val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
    rpcEnv.awaitTermination()
  }

2.1 master的配置conf和java -cp命令传入的参数处理

//实用的功能主要是设置一些平常的诊断状态,,应该在main方法之前调用, 在类unix系统上注册一个信号处理程序来记录信号。
    Utils.initDaemon(log)
    val conf = new SparkConf
    val args = new MasterArguments(argStrings, conf)

2.2 启动RpcEnv和Endpoint

 val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)

2.2.1 启动master,并且返回,Master RpcEnv,web UI bound port,REST server

 /**
   * Start the Master and return a three tuple of:
   *   (1) The Master RpcEnv
   *   (2) The web UI bound port
   *   (3) The REST server bound port, if any
   */
  def startRpcEnvAndEndpoint(
      host: String,
      port: Int,
      webUiPort: Int,
      conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
    val securityMgr = new SecurityManager(conf)
    // 创建rpcEnv主要是接收消息,然后交给backend处理
    val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
    // 调用的是NettyRpcEnv的setupEndpoint方法,里面貌似重复了?
    val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
      new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))

    // 发送一条消息到相应的[[RpcEndpoint.receiveAndReply]] 。在默认超时时间内得到它的结果,如果失败,则抛出异常。
    // Endpoint启动时,会默认启动TransportServer,且启动结束后会进行一次同步测试rpc可用性(askSync-BoundPortsRequest)
    val portsResponse = masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest)
    (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
  }
2.2.1.1 创建rpcEnv
// 创建rpcEnv主要是接收消息,然后交给backend处理
    val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)

该方法调用了object RpcEnv的create方法

def create(
      name: String,
      host: String,
      port: Int,
      conf: SparkConf,
      securityManager: SecurityManager,
      clientMode: Boolean = false): RpcEnv = {
    create(name, host, host, port, conf, securityManager, clientMode)
  }

然后继续

def create(
      name: String,
      bindAddress: String,
      advertiseAddress: String,
      port: Int,
      conf: SparkConf,
      securityManager: SecurityManager,
      clientMode: Boolean): RpcEnv = {
    // 创建RpcEnv时通过创建RpcEnvFactory(默认为netty)然后使用工厂创建RpcEnv,如下代码所示
    val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
      clientMode)
    new NettyRpcEnvFactory().create(config)
  }

最后调用了class NettyRpcEnvFactory 的create方法

 /**
    * NettyRpcEnvFactory创建了NettyRpcEnv之后,如果clientMode为false,即服务端(Driver端Rpc通讯),则使用创建出
    * 的NettyRpcEnv的函数startServer定义一个函数变量startNettyRpcEnv((nettyEnv, nettyEnv.address.port)为函
    * 数的返回值),将该函数作为参数传递给函数Utils.startServiceOnPort,即在Driver端启动服务。
    *
    * 这里可以进入Utils.startServiceOnPort这个函数看看源代码,可以看出为什么不直接调用nettyEnv.startServer,而要把它封装起来
    * 传递给工具类来调用:在这个端口启动服务不一定一次就能成功,工具类里对失败的情况做最大次数的尝试,直到启动成功并返回启
    * 动成功后的端口。
    */
  def create(config: RpcEnvConfig): RpcEnv = {
    val sparkConf = config.conf
    // Use JavaSerializerInstance in multiple threads is safe. However, if we plan to support
    // KryoSerializer in future, we have to use ThreadLocal to store SerializerInstance
    // 在多个线程中使用JavaSerializerInstance是安全的。然而,如果我们计划将来支持KryoSerializer,
    // 我们必须使用ThreadLocal来存储SerializerInstance
    // Netty的通讯都是基于Jav序列化,暂时不支持Kryo

    // a.初始化JavaSerializer,初始化NettyRpcEnv,如果是 非客户端模式就启动netty服务
    val javaSerializerInstance =
      new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]

    // 初始化NettyRpcEnv
    val nettyEnv =
      new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
        config.securityManager)
    // 判断是否在Driver端
    if (!config.clientMode) {
      // startNettyRpcEnv作为一个函数变量将在下面的startServiceOnPort中被调用
      // 简单解释一下这段代码
      // 声明一个函数变量,参数是int(actuslPort),=>后面是实现体,最终返回的是2元组(NettyRpcEnv,int)
      val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
        /** 主要是构建TransportServer和注册dispatcher */
        nettyEnv.startServer(config.bindAddress, actualPort)
        (nettyEnv, nettyEnv.address.port)
      }
      try {
        // 其实内部实现还是调用startNettyRpcEnv在指定的端口实例化,并且返回nettyEnv对象
        Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
      } catch {
        case NonFatal(e) =>
          nettyEnv.shutdown()
          throw e
      }
    }
    nettyEnv
  }
2.2.1.1.1 创建NettyRpcEnv
// 初始化NettyRpcEnv
    val nettyEnv =
      new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
        config.securityManager)

这里new NettyRpcEnv,但是这段代码属于org.apache.spark.rpc.netty.NettyRpcEnv,既然new了,肯定图中第一个new完成了,我看看调用路径是org.apache.spark.deploy.master.Master.main()==>org.apache.spark.rpc.RpcEnv.create====>org.apache.spark.rpc.netty.NettyRpcEnvFactory.create方法,这里面没有看到RpcEndpoint啊,但是看看master,Master继承ThreadSafeRpcEndpoint,ThreadSafeRpcEndpoint 继承RpcEndpoint,因此这里吗,master就是一个RpcEndpoint

private[deploy] class Master(
    override val rpcEnv: RpcEnv,
    address: RpcAddress,
    webUiPort: Int,
    val securityMgr: SecurityManager,
    val conf: SparkConf)
  extends ThreadSafeRpcEndpoint with Logging with LeaderElectable {
private[spark] trait ThreadSafeRpcEndpoint extends RpcEndpoint

然后我们看看new NettyRpcEnv主要初始化了什么

```
// 初始化NettyRpcEnv
    val nettyEnv =
      new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
        config.securityManager)

在这个类中主要创建了 Dispatcher类是一个消息分发器

// Dispatcher类是一个消息分发器,负责将RPC消息发送到适当的端点。该类有一个内部类
  // dispatcher负责把messages发送到相关的Endpoint上
  private val dispatcher: Dispatcher = new Dispatcher(this)

然后先创建NettyRpcHandler,然后创建TransportContext

// TransportContext主要用于创建TransportServer和TransportClientFactory
  /** 先创建NettyRpcHandler
      然后创建TransportContext
    */
  private val transportContext = new TransportContext(transportConf,
    new NettyRpcHandler(dispatcher, this, streamManager))

我们先看看创建NettyRpcHandler,这个类中主要创建了一个路由表,其他的主要是接收消息的相关方法,最重要的是receive方法

// A variable to track the remote RpcEnv addresses of all clients 跟踪所有客户端远程RpcEnv地址的变量
  // 相当于路由表
  private val remoteAddresses = new ConcurrentHashMap[RpcAddress, RpcAddress]()

然后回到NettyRpcEnv类中,看创建TransportContext
这个类中,主要初始化了编码

 /**
   * Force to create MessageEncoder and MessageDecoder so that we can make sure they will be created
   * before switching the current context class loader to ExecutorClassLoader.
   *
   * 强制创建MessageEncoder和MessageDecoder,这样我们可以确保在将当前上下文类加载器切换到
   * ExecutorClassLoader之前创建它们。
   *
   * Netty's MessageToMessageEncoder uses Javassist to generate a matcher class and the
   * implementation calls "Class.forName" to check if this calls is already generated. If the
   * following two objects are created in "ExecutorClassLoader.findClass", it will cause
   * "ClassCircularityError". This is because loading this Netty generated class will call
   * "ExecutorClassLoader.findClass" to search this class, and "ExecutorClassLoader" will try to use
   * RPC to load it and cause to load the non-exist matcher class again. JVM will report
   * `ClassCircularityError` to prevent such infinite recursion. (See SPARK-17714)
   *
   * Netty的MessageToMessageEncoder使用Javassist生成一个matcher类和实现调用"Class.forName"
   * 去检查是否已生成此调用。如果在"ExecutorClassLoader.findClass"中创建了以下两个对象。
   * 它会导致“ClassCircularityError”。这是因为加载这个Netty生成的类将调用"ExecutorClassLoader.findClass"
   * 去搜索这个类,而且"ExecutorClassLoader"将尝试使用RPC来加载它,并导致再次加载不存在的matcher类。
   * JVM将报告“ClassCircularityError”,以防止这种无限递归。(见火花- 17714)
   *
   *
   * 为什么需要MessageEncoder和MessageDecoder?
   *  因为在给予流的传输里(比如TCP/IP),接收到的数据首先会被存储到一个socket接受缓冲里,不幸的是,基于流的传输
   *  并不是一个数据包队列,而是一个字节队列。即时发送了2个独立的数据包,操作系统也不会作为2个消息处理,而仅仅认为是
   *  一连串的字节。因此不能保证远程写入的数据被精准的读取,举个列子,假设操作系统的TCP/IP协议栈已经接受到了3个数据包。
   *  ABC,DEF,GHI,由于给予流产术的协议的这种统一的性质,在应用程序读取数据时很可能被分成以下的片段:AB,CDEFG,H,I
   *  .因此,接收方不管是客户端还是服务端,都应该把接收到的数据整理成一个或者多个更加有意义并且让程序的逻辑更好理解的
   *  数据。
   *
   *
   */
  private static final MessageEncoder ENCODER = MessageEncoder.INSTANCE;
  private static final MessageDecoder DECODER = MessageDecoder.INSTANCE;

然后回到NettyRpcEnv类中,创建clientFactory

/** 创建clientFactory */
  private val clientFactory = transportContext.createClientFactory(createClientBootstraps())
/**
   * Initializes a ClientFactory which runs the given TransportClientBootstraps prior to returning
   * a new Client. Bootstraps will be executed synchronously, and must run successfully in order
   * to create a Client.
   *
   * 初始化一个ClientFactory它是运行给定的TransportClientBootstraps之前返回一个新客户。引导程序将同步执行,
   * 并且必须成功地运行以创建客户机。
   *
   * TeansportCilentFactory是创建Netty客户端TransportClient的工厂类,TransportClient用于向Netty端发送RPC请求,TransportContext的
   * createClientFactory方法用于创建TransportClientFactory。
   */
  public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps) {
    return new TransportClientFactory(this, bootstraps);
  }

这里主要选择了IO模式:nio 还是 epoll

/**
   * TransportClientFactory由以下部分组成:
   *    1.clientBootstraps:用于缓存客户端列表;
   *    2.connectionPool:用于缓存客户端连接;
   *    3.numConnectionsPerPeer:节点之间取数据的连接数,可以使用属性spark.shuffle.io.numConnectionsPerPeer来配置。默认为1;
   *    4.socketChannelClass:客户端channel被创建时使用的类,可以使用属性spark.shuffle.io.mode来配置,默认为NioSocketChannel;
   *    5.workerGroup:根据Netty的规范,客户端只有work组,所以此处创建workerGroup,实际上是NioEventLoopGroup;
   *    6.pooledAllocator:汇集ByteBuf但对本地线程存储禁用的分配器。
   *    7.TransportClientFactory里面使用了大量的NettyUtils
   *
   * 提示:NIO是指Java中New IO的简称,其特点包括:为所有的原始类型提供(Buffer)缓存支持,字符集编码解码解决方案;提供
   *      一个新的原始IO抽象Channel,支持锁和内存映射文件的访问接口,提供多路非阻塞式(non-bloking)的高可伸缩性网络IO.
   *      其具体使用属于java语言的范畴。
   *
   */
  public TransportClientFactory(
      TransportContext context,
      List<TransportClientBootstrap> clientBootstraps) {
    this.context = Preconditions.checkNotNull(context);
    this.conf = context.getConf();
    this.clientBootstraps = Lists.newArrayList(Preconditions.checkNotNull(clientBootstraps));
    this.connectionPool = new ConcurrentHashMap<>();
    this.numConnectionsPerPeer = conf.numConnectionsPerPeer();
    this.rand = new Random();

    IOMode ioMode = IOMode.valueOf(conf.ioMode());
    this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode);
    this.workerGroup = NettyUtils.createEventLoop(
        ioMode,
        conf.clientThreads(),
        conf.getModuleName() + "-client");
    this.pooledAllocator = NettyUtils.createPooledByteBufAllocator(
      conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads());
  }

然后回到NettyRpcEnv类中,创建了发件箱outboxes

/**
   * A map for [[RpcAddress]] and [[Outbox]]. When we are connecting to a remote [[RpcAddress]],
   * we just put messages to its [[Outbox]] to implement a non-blocking `send` method.
    *
    * 关于本地处理消息的机制解析完毕,接下来是远程消息体的处理机制解析
    * 这里的消息的底层完全基于Netty管道的writeAndFlush操作,当然也包括了单向和双向消息体,具体实现如下
    * 先了解下这个outboxes-----每个节点都有个outboxes用来存储各个节点对应的outbox
    * 如果接收者非本地地址就会直接发送给对方的outbox, 然后等待线程消费
    *
    * //  当调远程Ref的时候,仅需连接到远程对应的Rpc地址并把message放入它的Outbox等待消费而避免了线程阻塞
    * //  还是调用的Java的ConcurrentHashMap数据结构做的outboxes,里面存放的是Rpc地址和他对应的outbox对象
    * //  outbox里面封装的则是messages消息队列,TransportClient,消息的处理机制等逻辑
   */
  private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()

回到org.apache.spark.rpc.netty.NettyRpcEnvFactory的create方法中,启动TransportServer

 // startNettyRpcEnv作为一个函数变量将在下面的startServiceOnPort中被调用
      // 简单解释一下这段代码
      // 声明一个函数变量,参数是int(actuslPort),=>后面是实现体,最终返回的是2元组(NettyRpcEnv,int)
      val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
        /** 主要是构建TransportServer和注册dispatcher */
        nettyEnv.startServer(config.bindAddress, actualPort)
        (nettyEnv, nettyEnv.address.port)
      }
def startServer(bindAddress: String, port: Int): Unit = {
    val bootstraps: java.util.List[TransportServerBootstrap] =
      // 检查是否启用了Spark通信协议的身份验证。
      if (securityManager.isAuthenticationEnabled()) {
        // Spark的auth协议进行身份验证
        java.util.Arrays.asList(new AuthServerBootstrap(transportConf, securityManager))
      } else {
        java.util.Collections.emptyList()
      }
    // 创建TransportServer
    server = transportContext.createServer(bindAddress, port, bootstraps)
    // 创建RpcEndpointVerifier,然后注册自己到NettyRpcEnv上并发回自己的Ref的实现
    dispatcher.registerRpcEndpoint(
      RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
  }

先看

// 创建TransportServer
    server = transportContext.createServer(bindAddress, port, bootstraps)
/** Create a server which will attempt to bind to a specific host and port.
   *  创建一个服务 该服务尝试绑定指定的主机名和端口
   * */
  public TransportServer createServer(
      String host, int port, List<TransportServerBootstrap> bootstraps) {
    return new TransportServer(this, host, port, rpcHandler, bootstraps);
  }
/**
   * Creates a TransportServer that binds to the given host and the given port, or to any available
   * if 0. If you don't want to bind to any special host, set "hostToBind" to null.
   * 创建一个传输服务器,它绑定到给定的主机和给定的端口,或者任何可用的如果是0。如果您不想绑定到任何特殊主机,则将“hostToBind”设置为null。
   * */
  public TransportServer(
      TransportContext context,
      String hostToBind,
      int portToBind,
      RpcHandler appRpcHandler,
      List<TransportServerBootstrap> bootstraps) {
    this.context = context;
    this.conf = context.getConf();
    this.appRpcHandler = appRpcHandler;
    this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps));

    try {
      // 初始化
      init(hostToBind, portToBind);
    } catch (RuntimeException e) {
      JavaUtils.closeQuietly(this);
      throw e;
    }
  }

初始化

 private void init(String hostToBind, int portToBind) {
    // IO模式:nio 还是 epoll
    IOMode ioMode = IOMode.valueOf(conf.ioMode());
    // 创建一个基于IOMode的Netty EventLoopGroup。这一点不是很懂
    EventLoopGroup bossGroup =
      NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server");
    // 事件循环组
    EventLoopGroup workerGroup = bossGroup;

    // 创建一个byte缓存池分配器,但是禁用线程本地缓存
    PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(
      conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());

    bootstrap = new ServerBootstrap()
      .group(bossGroup, workerGroup)
      .channel(NettyUtils.getServerChannelClass(ioMode))
      .option(ChannelOption.ALLOCATOR, allocator)
      .childOption(ChannelOption.ALLOCATOR, allocator);

    if (conf.backLog() > 0) {
      bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());
    }

    if (conf.receiveBuf() > 0) {
      bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf());
    }

    if (conf.sendBuf() > 0) {
      bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf());
    }

    bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
      @Override
      protected void initChannel(SocketChannel ch) throws Exception {
        RpcHandler rpcHandler = appRpcHandler;
        for (TransportServerBootstrap bootstrap : bootstraps) {
          rpcHandler = bootstrap.doBootstrap(ch, rpcHandler);
        }
        // 初始化
        context.initializePipeline(ch, rpcHandler);
      }
    });

    InetSocketAddress address = hostToBind == null ?
        new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);
    // 绑定地址和端口
    channelFuture = bootstrap.bind(address);
    channelFuture.syncUninterruptibly();

    port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
    // 到时候查查这个Shuffle server started on port  看的看是什么内容
    logger.debug("Shuffle server started on port: {}", port);
  }

主要看看initializePipeline方法

 // 初始化
        context.initializePipeline(ch, rpcHandler);
/**
   * Initializes a client or server Netty Channel Pipeline which encodes/decodes messages and
   * has a {@link org.apache.spark.network.server.TransportChannelHandler} to handle request or
   * response messages.
   *
   * 初始化一个客户机或服务器网状的信道编码/解码消息的管道,org.apache.spark.network.server.TransportChannelHandler
   * 来处理请求或响应消息。
   *
   * @param channel The channel to initialize.
   * @param channelRpcHandler The RPC handler to use for the channel.
   *
   * @return Returns the created TransportChannelHandler, which includes a TransportClient that can
   * be used to communicate on this channel. The TransportClient is directly associated with a
   * ChannelHandler to ensure all users of the same channel get the same TransportClient object.
   *
   *  返回创建的TransportChannelHandler,它包含一个可用于在该通道上通信的传输客户机TransportClient。传输客户机
   *  TransportClient与通道处理程序直接关联,以确保同一通道的所有用户都得到相同的传输客户机对象。
   */
  public TransportChannelHandler initializePipeline(
      SocketChannel channel,
      RpcHandler channelRpcHandler) {
    try {
      // 创建TransportChannelHandler
      TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
      channel.pipeline()
        .addLast("encoder", ENCODER)
        .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
        .addLast("decoder", DECODER)
        .addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
        // NOTE: Chunks are currently guaranteed to be returned in the order of request, but this
        // would require more logic to guarantee if this were not part of the same event loop.
        .addLast("handler", channelHandler);
      return channelHandler;
    } catch (RuntimeException e) {
      logger.error("Error while initializing Netty pipeline", e);
      throw e;
    }
  }

我们看看

// 创建TransportChannelHandler
      TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);

这里主要创建了
TransportResponseHandler,
TransportClient,
TransportRequestHandler,
TransportChannelHandler

此处 涉及接收消息

  1. TransportResponseHandler:处理服务器响应的处理程序,响应来自TransportClient的请求。它通过跟踪未处理的请求列表(以及它们的回调)来工作。
  2. TransportRequestHandler :处理服务器响应的处理程序,响应来自TransportClient的请求。它通过跟踪未处理的请求列表(以及它们的回调)来工作。
  3. TransportClient:客户端获取预先协商的流的连续块。这个API的目的是为了有效地传输大量数据,分割成大小为几百KB到几MB的数据块。
  4. TransportRequestHandler :TransportChannelHandler内部实现也很简单,它封装了responseHandler和requestHandler, 当从Netty中读取一条Message以后,根据判断路由给相应的responseHandler和requestHandler。
 /**
   * Creates the server- and client-side handler which is used to handle both RequestMessages and
   * ResponseMessages. The channel is expected to have been successfully created, though certain
   * properties (such as the remoteAddress()) may not be available yet.
   *
   * 创建用于处理RequestMessages和responsemessage的服务器和客户端处理程序。尽管某些属性(例如remoteAddress())
   * 可能还不能使用,但该通道有望被成功创建。
   */
  private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) {
    //  创建TransportResponseHandler 请求处理程序
    TransportResponseHandler responseHandler = new TransportResponseHandler(channel);
    // 创建传输客户端
    TransportClient client = new TransportClient(channel, responseHandler);
    // ransportRequestHandler,它封装了对所有请求/响应的处理;
    TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
      rpcHandler);
    // 创建TransportChannelHandler
    return new TransportChannelHandler(client, responseHandler, requestHandler,
      conf.connectionTimeoutMs(), closeIdleConnections);
  }

然后是绑定地址和端口

 // 绑定地址和端口
    channelFuture = bootstrap.bind(address);
    channelFuture.syncUninterruptibly();

然后回到NettyRpcEnv类中,创建RpcEndpointVerifier

// 创建RpcEndpointVerifier,然后注册自己到NettyRpcEnv上并发回自己的Ref的实现
    dispatcher.registerRpcEndpoint(
      RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
/**
 * An [[RpcEndpoint]] for remote [[RpcEnv]]s to query if an `RpcEndpoint` exists.
 *
  * 远程RpcEnv的一个RpcEndpoint来查询“RpcEndpoint”是否存在。
 * This is used when setting up a remote endpoint reference.
  *  这是在设置远程端点引用时使用的。
 */
private[netty] class RpcEndpointVerifier(override val rpcEnv: RpcEnv, dispatcher: Dispatcher)
  extends RpcEndpoint {

  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    case RpcEndpointVerifier.CheckExistence(name) => context.reply(dispatcher.verify(name))
  }
}

然后回到NettyRpcEnv类中,调用startNettyRpcEnv在指定的端口实例化,并且返回nettyEnv对象

try {
        // 其实内部实现还是调用startNettyRpcEnv在指定的端口实例化,并且返回nettyEnv对象
        Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
      } catch {
        case NonFatal(e) =>
          nettyEnv.shutdown()
          throw e
      }
/**
   * Attempt to start a service on the given port, or fail after a number of attempts.
   * Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0).
   *
   * @param startPort The initial port to start the service on.
   * @param startService Function to start service on a given port.
   *                     This is expected to throw java.net.BindException on port collision.
   * @param conf A SparkConf used to get the maximum number of retries when binding to a port.
   * @param serviceName Name of the service.
   * @return (service: T, port: Int)
   */
  /**
  启动的时候master at spark://biluos.com:7079 后面的端口号开始从7077开始 一直到成功
   */
  def startServiceOnPort[T](
      startPort: Int,
      startService: Int => (T, Int),
      conf: SparkConf,
      serviceName: String = ""): (T, Int) = {

    /**端口号必须1024 and 65535 之间*/
    require(startPort == 0 || (1024 <= startPort && startPort < 65536),
      "startPort should be between 1024 and 65535 (inclusive), or 0 for a random free port.")

    val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
    val maxRetries = portMaxRetries(conf)
    for (offset <- 0 to maxRetries) {
      // Do not increment port if startPort is 0, which is treated as a special port
      val tryPort = if (startPort == 0) {
        startPort
      } else {
        userPort(startPort, offset)
      }
      try {
        val (service, port) = startService(tryPort)
        // 17/12/05 11:56:50 INFO Utils: Successfully started service 'sparkDriver' on port 55271.
        //22=> 17/12/05 11:56:50 INFO Utils: Successfully started service 'SparkUI' on port 4040.
        //25=>17/12/05 11:56:51 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 55290.
        logInfo(s"Successfully started service$serviceString on port $port.")
        return (service, port)
      } catch {
        case e: Exception if isBindCollision(e) =>
          if (offset >= maxRetries) {
            val exceptionMessage = if (startPort == 0) {
              s"${e.getMessage}: Service$serviceString failed after " +
                s"$maxRetries retries (on a random free port)! " +
                s"Consider explicitly setting the appropriate binding address for " +
                s"the service$serviceString (for example spark.driver.bindAddress " +
                s"for SparkDriver) to the correct binding address."
            } else {
              s"${e.getMessage}: Service$serviceString failed after " +
                s"$maxRetries retries (starting from $startPort)! Consider explicitly setting " +
                s"the appropriate port for the service$serviceString (for example spark.ui.port " +
                s"for SparkUI) to an available port or increasing spark.port.maxRetries."
            }
            val exception = new BindException(exceptionMessage)
            // restore original stack trace
            exception.setStackTrace(e.getStackTrace)
            throw exception
          }
          if (startPort == 0) {
            // As startPort 0 is for a random free port, it is most possibly binding address is
            // not correct.
            logWarning(s"Service$serviceString could not bind on a random free port. " +
              "You may check whether configuring an appropriate binding address.")
          } else {
            logWarning(s"Service$serviceString could not bind on port $tryPort. " +
              s"Attempting port ${tryPort + 1}.")
          }
      }
    }
    // Should never happen
    throw new SparkException(s"Failed to start service$serviceString on port $startPort")
  }

然后回到org.apache.spark.deploy.master.Master.startRpcEnvAndEndpoint方法中

 // 调用的是NettyRpcEnv的setupEndpoint方法,里面貌似重复了?
    val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
      new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))

这里创建了masterEndpoint,是一个RpcEndpoint

/**
    * 根据RpcEndpoint的name注册到RpcEnv中并返回它的一个引用RpcEndpointRef
    */
  override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
    // Dispatcher类是一个消息分发器,负责将RPC消息发送到适当的端点。该类有一个内部类
    dispatcher.registerRpcEndpoint(name, endpoint)
  }
/**
    * 注册自己到NettyRpcEnv上并发回自己的Ref的实现
    */
  def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
    // 拿到nettyEnv地址
    val addr = RpcEndpointAddress(nettyEnv.address, name)
    // 创建NettyRpcEndpointRef,集成与*超类RpcEndpointRef
    val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
    synchronized {
      // 如果dispatcher已停止 ,消息路由器,死掉了,肯定发不出消息了
      if (stopped) {
        throw new IllegalStateException("RpcEnv has been stopped")
      }
      // 因为是参数,先创建一个EndpointData,主要创建了收件箱Inbox,然后添加了第一条收件箱消息messages.add(OnStart)
      // putIfAbsent:如果指定的键未与某个值关联,则将其与给定值关联。相当于
      // if (!map.containsKey(key))
      //    return map.put(key, value);
      //  else
      //    return map.get(key);
      if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
        throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
      }
      // 根据endpoint的名字提取到对应的EndpointData
      val data = endpoints.get(name)
      // 放入endpoint和对应的ref,缓存RpcEndpoint关系
      endpointRefs.put(data.endpoint, data.ref)
      // 最后把EndpointData假如到receivers,调用offer塞入数据到尾部的时候,不会因为队列已满而报错或者阻塞,
      // 而是直接返回false(put会阻塞,add会报错)
      receivers.offer(data)  // for the OnStart message
    }
    // 返回endpointRef
    endpointRef
  }

最后发送一条测试消息

// 发送一条消息到相应的[[RpcEndpoint.receiveAndReply]] 。在默认超时时间内得到它的结果,如果失败,则抛出异常。
    // Endpoint启动时,会默认启动TransportServer,且启动结束后会进行一次同步测试rpc可用性(askSync-BoundPortsRequest)
    val portsResponse = masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest)
    (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)