欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Spark RPC框架源码分析(二)RPC运行时序

程序员文章站 2022-12-15 10:58:43
Spark RPC 框架的运行时序是怎样的呢?让我们深入到它的源码里面去看看~~ ......

前情提要:

一. spark rpc概述

上一篇我们已经说明了spark rpc框架的一个简单例子,spark rpc相关的两个编程模型,actor模型和reactor模型以及一些常用的类。这一篇我们还是用上一篇的例子,从代码的角度讲述spark rpc的运行时序,从而揭露spark rpc框架的运行原理。我们主要将分成两部分来讲,分别从服务端的角度和客户端的角度深度解析。

不过源码解析部分都是比较枯燥的,spark rpc这里也是一样,其中很多东西都是绕来绕去,墙裂建议使用上一篇中介绍到的那个spark rpc项目,下载下来并运行,通过断点的方式来一步一步看,结合本篇文章,你应该会有更大的收获。

ps:所用spark版本:spark2.1.0

二. spark rpc服务端

我们将以上一篇helloworldserver为线索,深入到spark rpc框架内部的源码中,来看看启动一个服务时都做了些什么。

因为代码部分都是比较绕的,每个类也经常会搞不清楚,我在介绍一个方法的源码时,通常都会将类名也一并写出来,这样应该会更加清晰一些。

helloworldserver{
  ......
  def main(args: array[string]): unit = {
    //val host = args(0)
    val host = "localhost"
    val config = rpcenvserverconfig(new rpcconf(), "hello-server", host, 52345)
    val rpcenv: rpcenv = nettyrpcenvfactory.create(config)
    val helloendpoint: rpcendpoint = new helloendpoint(rpcenv)
    rpcenv.setupendpoint("hello-service", helloendpoint)
    rpcenv.awaittermination()
  }
  ......
}

Spark RPC框架源码分析(二)RPC运行时序

这段代码中有两个主要流程,我们分别来说

2.1 服务端nettyrpcenvfactory.create(config)

首先是下面这条代码的运行流程:

val rpcenv: rpcenv = nettyrpcenvfactory.create(config)

其实就是通过 nettyrpcenvfactory 创建出一个 rpc environment ,其具体类是 nettyrpcenv 。

我们再来看看创建过程中会发生什么。

object nettyrpcenvfactory extends rpcenvfactory {
    ......
    def create(config: rpcenvconfig): rpcenv = {
        val conf = config.conf
    
        // use javaserializerinstance in multiple threads is safe. however, if we plan to support
        // kryoserializer in future, we have to use threadlocal to store serializerinstance
        val javaserializerinstance =
        new javaserializer(conf).newinstance().asinstanceof[javaserializerinstance]
        //根据配置以及地址,new 一个 nettyrpcenv ,
        val nettyenv =
        new nettyrpcenv(conf, javaserializerinstance, config.bindaddress)
        //如果是服务端创建的,那么会启动服务。服务端和客户端都会通过这个方法创建一个 nettyrpcenv ,但区别就在这里了。
        if (!config.clientmode) {
        val startnettyrpcenv: int => (nettyrpcenv, int) = { actualport =>
            //启动服务的方法,下一步就是调用这个方法了
            nettyenv.startserver(config.bindaddress, actualport)
            (nettyenv, nettyenv.address.port)
        }
        try {
            utils.startserviceonport(config.port, startnettyrpcenv, conf, config.name)._1
        } catch {
            case nonfatal(e) =>
            nettyenv.shutdown()
            throw e
        }
        }
        nettyenv
    }
    ......
}

还没完,如果是服务端调用这段代码,那么主要的功能是创建rpcenv,即nettyrpcenv(客户端在后面说)。以及通过下面这行代码,

nettyenv.startserver(config.bindaddress, actualport)

去调用相应的方法启动服务端的服务。下面进入到这个方法中去看看。

class nettyrpcenv(
                   val conf: rpcconf,
                   javaserializerinstance: javaserializerinstance,
                   host: string) extends rpcenv(conf) {
  ......
  def startserver(bindaddress: string, port: int): unit = {
    // here disable security
    val bootstraps: java.util.list[transportserverbootstrap] = java.util.collections.emptylist()
    //transportcontext 属于 spark.network 中的部分,负责 rpc 消息在网络中的传输
    server = transportcontext.createserver(bindaddress, port, bootstraps)
    //在每个 rpcendpoint 注册的时候都会注册一个默认的 rpcendpointverifier,它的作用是客户端调用的时候先用它来询问 endpoint 是否存在。
    dispatcher.registerrpcendpoint(
      rpcendpointverifier.name, new rpcendpointverifier(this, dispatcher))
  }
  ......
}

执行完毕之后这个create方法就结束。这个流程主要就是开启一些服务,然后返回一个新的nettyrpcenv。

2.2 服务端rpcenv.setupendpoint("hello-service",helloendpoint)

这条代码会去调用nettyrpcenv中相应的方法

class nettyrpcenv(
                   val conf: rpcconf,
                   javaserializerinstance: javaserializerinstance,
                   host: string) extends rpcenv(conf) {
  ......
  override def setupendpoint(name: string, endpoint: rpcendpoint): rpcendpointref = {
    dispatcher.registerrpcendpoint(name, endpoint)
  }
  ......
}

我们看到,这个方法主要是调用dispatcher进行注册的。dispatcher的功能上一节已经说了,

dispatcher的主要作用是保存注册的rpcendpoint、分发相应的message到rpcendpoint中进行处理。dispatcher即是上图中threadpool的角色。它同时也维系一个threadpool,用来处理每次接受到的 inboxmessage。而这里处理inboxmessage是通过inbox实现的。

这里我们就说一说dispatcher的流程。

dispatcher

dispatcher在nettyrpcenv被创建的时候创建出来。

class nettyrpcenv(
                   val conf: rpcconf,
                   javaserializerinstance: javaserializerinstance,
                   host: string) extends rpcenv(conf) {
    ......
    //初始化时创建 dispatcher
    private val dispatcher: dispatcher = new dispatcher(this)
    ......
}

dispatcher类被创建的时候也有几个属性需要注意:

private[netty] class dispatcher(nettyenv: nettyrpcenv) {
    ......
    //每个 rpcendpoint 其实都会被整合成一个 endpointdata 。并且每个 rpcendpoint 都会有一个 inbox。
    private class endpointdata(
                                val name: string,
                                val endpoint: rpcendpoint,
                                val ref: nettyrpcendpointref) {
        val inbox = new inbox(ref, endpoint)
    }
    
    //一个阻塞队列,当有 rpcendpoint 相关请求(inboxmessage)的时候,就会将请求塞到这个队列中,然后被线程池处理。
    private val receivers = new linkedblockingqueue[endpointdata]
    
    //初始化便创建出来的线程池,当上面的 receivers 队列中没内容时,会阻塞。当有 rpcendpoint 相关请求(即 inboxmessage )的时候就会立刻执行。
    //这里处理 inboxmessage 本质上是调用相应 rpcendpoint 的 inbox 去处理。
    private val threadpool: threadpoolexecutor = {
        val numthreads = nettyenv.conf.getint("spark.rpc.netty.dispatcher.numthreads",
        math.max(2, runtime.getruntime.availableprocessors()))
        val pool = threadutils.newdaemonfixedthreadpool(numthreads, "dispatcher-event-loop")
        for (i <- 0 until numthreads) {
            pool.execute(new messageloop)
        }
        pool
    }
    ......
}

了解一些dispatcher的逻辑流程后,我们来正式看看dispatcher的registerrpcendpoint方法。

顾名思义,这个方法就是将rpcendpoint注册到dispatcher中去。当有message到来的时候,便会分发message到相应的rpcendpoint中进行处理。

private[netty] class dispatcher(nettyenv: nettyrpcenv) {
  ......
  def registerrpcendpoint(name: string, endpoint: rpcendpoint): nettyrpcendpointref = {
    val addr = rpcendpointaddress(nettyenv.address, name)
    //注册 rpcendpoint 时需要的是 上面的 endpointdata ,其中就包含 endpointref ,这个主要是供客户端使用的。
    val endpointref = new nettyrpcendpointref(nettyenv.conf, addr, nettyenv)
    //多线程环境下,注册一个 rpcendpoint 需要判断现在是否处于 stop 状态。
    synchronized {
      if (stopped) {
        throw new illegalstateexception("rpcenv has been stopped")
      }
      //新建 endpointdata 并存储到一个 concurrentmap 中。
      if (endpoints.putifabsent(name, new endpointdata(name, endpoint, endpointref)) != null) {
        throw new illegalargumentexception(s"there is already an rpcendpoint called $name")
      }
      val data = endpoints.get(name)
      endpointrefs.put(data.endpoint, data.ref)
      //将 这个 endpointdata 加入到 receivers 队列中,此时 dispatcher 中的 threadpool 会去处理这个加进来的 endpointdata 
      //处理过程是调用它的 inbox 的 process()方法。然后 inbox 会等待消息到来。
      receivers.offer(data) // for the onstart message
    }
    endpointref
  }
  ......
}

spark rpc服务端逻辑小结:我们说明了spark rpc服务端启动的逻辑流程,分为两个部分,第一个是rpc env,即nettyrpcenv的创建过程,第二个则是rpcendpoint注册到dispatcher的流程。
1. nettyrpcenvfactory 创建 nettyrpcenv

  • 根据地址创建nettyrpcenv。
  • nettyrpcenv开始启动服务,包括transportcontext根据地址开启监听服务,向dispacther注册一个rpcendpointverifier等待。

2. dispatcher注册rpcendpoint

  • dispatcher初始化时便创建一个线程池并阻塞等待receivers队列中加入新的endpointdata
  • 一旦新加入endpointdata便会调用该endpointdata的inbox去处理消息。比如onstart消息,或是rpcmessage等等。

三.spark rpc客户端

依旧是以上一节 helloworld 的客户端为线索,我们来逐层深入在 rpc 中,客户端 helloworldclient 的 asynccall() 方法。

object helloworldclient {
  ......
  def asynccall() = {
    val rpcconf = new rpcconf()
    val config = rpcenvclientconfig(rpcconf, "hello-client")
    val rpcenv: rpcenv = nettyrpcenvfactory.create(config)
    val endpointref: rpcendpointref = rpcenv.setupendpointref(rpcaddress("localhost", 52345), "hello-service")
    val future: future[string] = endpointref.ask[string](sayhi("neo"))
    future.oncomplete {
      case scala.util.success(value) => println(s"got the result = $value")
      case scala.util.failure(e) => println(s"got error: $e")
    }
    await.result(future, duration.apply("30s"))
    rpcenv.shutdown()
  }
  ......
}

Spark RPC框架源码分析(二)RPC运行时序

创建spark rpc客户端env(即nettyrpcenvfactory)部分和spark rpc服务端是一样的,只是不会开启监听服务,这里就不详细展开。

我们从这一句开始看,这也是spark rpc客户端和服务端区别的地方所在。

val endpointref: rpcendpointref = rpcenv.setupendpointref(rpcaddress("localhost", 52345), "hello-service")

setupendpointref()

上面的的setupendpointref最终会去调用下面setupendpointref()这个方法,这个方法中又进行一次跳转,跳转去setupendpointrefbyuri这个方法中。需要注意的是这两个方法都是rpcenv里面的,而rpcenv是抽象类,它里面只实现部分方法,而nettyrpcenv继承了它,实现了全部方法。

abstract class rpcenv(conf: rpcconf) {
  ......
  def setupendpointref(address: rpcaddress, endpointname: string): rpcendpointref = {
    //会跳转去调用下面的方法
    setupendpointrefbyuri(rpcendpointaddress(address, endpointname).tostring)
  }
  
  def setupendpointrefbyuri(uri: string): rpcendpointref = {
    //其中 asyncsetupendpointrefbyuri() 返回的是 future[rpcendpointref]。 这里就是阻塞,等待返回一个 rpcendpointref。
    // defaultlookuptimeout.awaitresult 底层调用 await.result 阻塞 直到结果返回或返回异常
    defaultlookuptimeout.awaitresult(asyncsetupendpointrefbyuri(uri))
  }
  ......
}  

这里最主要的代码其实就一句,

defaultlookuptimeout.awaitresult(asyncsetupendpointrefbyuri(uri))

这一段可以分为两部分,第一部分的defaultlookuptimeout.awaitresult其实底层是调用await.result阻塞等待一个异步操作,直到结果返回。

而asyncsetupendpointrefbyuri(uri)则是根据给定的uri去返回一个rpcendpointref,它是在nettyrpcenv中实现的:

class nettyrpcenv(
                   val conf: rpcconf,
                   javaserializerinstance: javaserializerinstance,
                   host: string) extends rpcenv(conf) {
  ......
  def asyncsetupendpointrefbyuri(uri: string): future[rpcendpointref] = {
    //获取地址
    val addr = rpcendpointaddress(uri)
    //根据地址等信息新建一个 nettyrpcendpointref 。
    val rpcendpointref = new nettyrpcendpointref(conf, addr, this) 
    //每个新建的 rpcendpointref 都有先有一个对应的verifier 去检查服务端存不存在对应的 rpcendpoint 。  
    val verifier = new nettyrpcendpointref(
      conf, rpcendpointaddress(addr.rpcaddress, rpcendpointverifier.name), this)
    //向服务端发送请求判断是否存在对应的 rpcendpoint。
    verifier.ask[boolean](rpcendpointverifier.createcheckexistence(endpointref.name)).flatmap { find =>
      if (find) {
        future.successful(endpointref)
      } else {
        future.failed(new rpcendpointnotfoundexception(uri))
      }
    }(threadutils.samethread)
  }
  ......
}
  

asyncsetupendpointrefbyuri()这个方法实现两个功能,第一个就是新建一个rpcendpointref。第二个是新建一个verifier,这个verifier的作用就是先给服务端发送一个请求判断是否存在rpcendpointref对应的rpcendpoint。

这段代码中最重要的就是verifiter.ask[boolean](...)了。如果有找到之后就会调用future.successful这个方法,反之则会通过future.failed抛出一个异常。

ask可以算是比较核心的一个方法,我们可以到ask方法中去看看。

class nettyrpcenv{
    ......
    private[netty] def ask[t: classtag](message: requestmessage, timeout: rpctimeout): future[t] = {
      val promise = promise[any]()
      val remoteaddr = message.receiver.address
      //
      def onfailure(e: throwable): unit = {
  //      println("555");
        if (!promise.tryfailure(e)) {
          log.warn(s"ignored failure: $e")
        }
      }
  
      def onsuccess(reply: any): unit = reply match {
        case rpcfailure(e) => onfailure(e)
        case rpcreply =>
          println("666");
          if (!promise.trysuccess(rpcreply)) {
            log.warn(s"ignored message: $reply")
          }
      }
  
      try {
        if (remoteaddr == address) {
          val p = promise[any]()
          p.future.oncomplete {
            case success(response) => onsuccess(response)
            case failure(e) => onfailure(e)
          }(threadutils.samethread)
          dispatcher.postlocalmessage(message, p)
        } else {
          //跳转到这里执行
          //封装一个 rpcoutboxmessage ,同时 onsuccess 方法也是在这里注册的。
          val rpcmessage = rpcoutboxmessage(serialize(message),
            onfailure,
            (client, response) => onsuccess(deserialize[any](client, response)))
          posttooutbox(message.receiver, rpcmessage)
          promise.future.onfailure {
            case _: timeoutexception =>  println("111");rpcmessage.ontimeout()
  //          case _ => println("222");
          }(threadutils.samethread)
        }
        
        val timeoutcancelable = timeoutscheduler.schedule(new runnable {
          override def run(): unit = {
  //          println("333");
            onfailure(new timeoutexception(s"cannot receive any reply in ${timeout.duration}"))
          }
        }, timeout.duration.tonanos, timeunit.nanoseconds)
        //promise 对应的 future oncomplete时会去调用,但当 successful 的时候,上面的 run 并不会被调用。
        promise.future.oncomplete { v =>
  //        println("4444");
          timeoutcancelable.cancel(true)
        }(threadutils.samethread)
  
      } catch {
        case nonfatal(e) =>
          onfailure(e)
      }
  
      promise.future.mapto[t].recover(timeout.addmessageiftimeout)(threadutils.samethread)
    }
    ......
}

这里涉及到使用一些scala多线程的高级用法,包括promise和future。如果想要对这些有更加深入的了解,。

这个函数的作用从名字中就可以看得出,其实就是将要发送的消息封装成一个rpcoutboxmessage,然后交给outbox去发送,outbox和前面所说的inbox对应,对应actor模型中的mailbox(信箱)。用于发送和接收消息。

其中使用到了future和promise进行异步并发以及错误处理,比如当发送时间超时的时候promise就会返回一个timeoutexception,而我们就可以设置自己的onfailure函数去处理这些异常。

ok,注册完rpcendpointref后我们便可以用它来向服务端发送消息了,而其实rpcendpointref发送消息还是调用ask方法,就是上面的那个ask方法。上面也有介绍,本质上就是通过outbox进行处理。

我们来梳理一下rpc的客户端的发送流程。

客户端逻辑小结:客户端和服务端比较类似,都是需要创建一个nettyrpcenv。不同的是接下来客户端创建的是rpcendpointref,并用之向服务端对应的rpcendpoint发送消息。

1.nettyrpcenvfactory创建nettyrpcenv

  • 根据地址创建nettyrpcenv。根据地址开启监听服务,向dispacther注册一个rpcendpointverifier等待。

2. 创建rpcendpointref

  • 创建一个新的rpcendpointref
  • 创建对应的verifier,使用verifier向服务端发送请求,判断对应的rpcendpoint是否存在。若存在,返回该rpcendpointref,否则抛出异常。

3. rpcendpointref使用同步或者异步的方式发送请求。

ok,以上就是sparkrpc时序的源码分析。下一篇会将一个实际的例子,spark的心跳机制和代码。喜欢的话就关注一波吧


推荐阅读 :
从分治算法到 mapreduce
actor并发编程模型浅析
大数据存储的进化史 --从 raid 到 hadoop hdfs