欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  移动技术

Android okhttp的启动流程及源码解析

程序员文章站 2022-07-02 19:32:46
前言这篇文章主要讲解了okhttp的主要工作流程以及源码的解析。什么是okhttp简单来说 okhttp 就是一个客户端用来发送 http 消息并对服务器的响应做出处理的应用层框架。 那么它有什么优点...

前言

这篇文章主要讲解了okhttp的主要工作流程以及源码的解析。

什么是okhttp

简单来说 okhttp 就是一个客户端用来发送 http 消息并对服务器的响应做出处理的应用层框架。 那么它有什么优点呢?

  • 易使用、易扩展。
  • 支持 http/2 协议,允许对同一主机的所有请求共用同一个 socket 连接。
  • 如果 http/2 不可用, 使用连接池复用减少请求延迟。
  • 支持 gzip,减小了下载大小。
  • 支持缓存处理,可以避免重复请求。
  • 如果你的服务有多个 ip 地址,当第一次连接失败,okhttp 会尝试备用地址。
  • okhttp 还处理了代理服务器问题和ssl握手失败问题。

okhttp是如何做网络请求的

1.它是如何使用的?

1.1 通过构造者模式添加 url,method,header,body 等完成一个请求的信息 request 对象

  val request = request.builder()
    .url("")
    .addheader("","")
    .get()
    .build()

1.2 同样通过构造者模式创建一个 okhttpclicent 实例,可以按需配置

  val okhttpclient = okhttpclient.builder()
    .connecttimeout(15, timeunit.seconds)
    .readtimeout(15, timeunit.seconds)
    .addinterceptor()
    .build()

1.3 创建 call 并且发起网络请求

 val newcall = okhttpclient.newcall(request)
 //异步请求数据
 newcall.enqueue(object :callback{
 override fun onfailure(call: call, e: ioexception) {}
 override fun onresponse(call: call, response: response) {}
 })
 //同步请求数据
 val response = newcall.execute()

整个使用流程很简单,主要的地方在于如何通过 call 对象发起同/异步请求,后续的源码追踪以方法开始。

2.如何通过 call 发起请求?

2.1 call 是什么

 /** prepares the [request] to be executed at some point in the future. */
 override fun newcall(request: request): call = realcall(this, request, forwebsocket = false)

2.2 发起请求-异步请求

//realcall#enqueue(responsecallback: callback) 

 override fun enqueue(responsecallback: callback) {
 synchronized(this) {
  //检查这个call是否执行过,每个 call 只能被执行一次
  check(!executed) { "already executed" }
  executed = true
 }
 //此方法调用了eventlistener#callstart(call: call),
 主要是用来监视应用程序的http调用的数量,大小和各个阶段的耗时
 callstart()
 //创建asynccall,实际是个runnable
 client.dispatcher.enqueue(asynccall(responsecallback))
 }

enqueue 最后一个方法分为两步

  • 第一步将响应的回调放入 asynccall 对象中 ,asynccall 对象是 realcall 的一个内部类实现了 runnable 接口。
  • 第二步通过 dispatcher 类的 enqueue() 将 asynccall 对象传入
//dispatcher#enqueue(call: asynccall)

 /** ready async calls in the order they'll be run. */
 private val readyasynccalls = arraydeque<asynccall>()
 
 internal fun enqueue(call: asynccall) {
 synchronized(this) {
  //将call添加到即将运行的异步队列
  readyasynccalls.add(call)
  ...
  promoteandexecute()
 }

//dispatcher#promoteandexecute() 
//将[readyasynccalls]过渡到[runningasynccalls]

 private fun promoteandexecute(): boolean {
 ...
 for (i in 0 until executablecalls.size) {
  val asynccall = executablecalls[i]
  //这里就是通过 executorservice 执行 run()
  asynccall.executeon(executorservice)
 }
 return isrunning
 }

//realcall.kt中的内部类

 internal inner class asynccall(
 private val responsecallback: callback
 ) : runnable {
 fun executeon(executorservice: executorservice) {
  ...
  //执行runnable
  executorservice.execute(this)
  ...
 	}
 	
 override fun run() {
  threadname("okhttp ${redactedurl()}") {
  ...
  try {
   //兜兜转转 终于调用这个关键方法了
   val response = getresponsewithinterceptorchain()
   signalledcallback = true
   //通过之前传入的接口回调数据
   responsecallback.onresponse(this@realcall, response)
  } catch (e: ioexception) {
   if (signalledcallback) {
   platform.get().log("callback failure for ${tologgablestring()}", platform.info, e)
   } else {
   responsecallback.onfailure(this@realcall, e)
   }
  } catch (t: throwable) {
   cancel()
   if (!signalledcallback) {
   val canceledexception = ioexception("canceled due to $t")
   canceledexception.addsuppressed(t)
   responsecallback.onfailure(this@realcall, canceledexception)
   }
   throw t
  } finally {
   //移除队列
   client.dispatcher.finished(this)
  }
  }
 }
 }

2.3 同步请求 realcall#execute()

 override fun execute(): response {
 //同样判断是否执行过
 synchronized(this) {
  check(!executed) { "already executed" }
  executed = true
 }
 timeout.enter()
 //同样监听
 callstart()
 try {
  //同样执行
  client.dispatcher.executed(this)
  return getresponsewithinterceptorchain()
 } finally {
  //同样移除
  client.dispatcher.finished(this)
 }
 }

3.如何通过拦截器处理请求和响应?

无论同异步请求都会调用到 getresponsewithinterceptorchain() ,这个方法主要使用责任链模式将整个请求分为几个拦截器调用 ,简化了各自的责任和逻辑,可以扩展其它拦截器,看懂了拦截器 okhttp 就了解的差不多了。

 @throws(ioexception::class)
 internal fun getresponsewithinterceptorchain(): response {
 // 构建完整的拦截器
 val interceptors = mutablelistof<interceptor>()
 interceptors += client.interceptors     //用户自己拦截器,数据最开始和最后
 interceptors += retryandfollowupinterceptor(client) //失败后的重试和重定向
 interceptors += bridgeinterceptor(client.cookiejar) //桥接用户的信息和服务器的信息
 interceptors += cacheinterceptor(client.cache)   //处理缓存相关
 interceptors += connectinterceptor      //负责与服务器连接
 if (!forwebsocket) {
  interceptors += client.networkinterceptors   //配置 okhttpclient 时设置,数据未经处理
 }
 interceptors += callserverinterceptor(forwebsocket) //负责向服务器发送请求数据、从服务器读取响应数据
 //创建拦截链
 val chain = realinterceptorchain(
  call = this,
  interceptors = interceptors,
  index = 0,
  exchange = null,
  request = originalrequest,
  connecttimeoutmillis = client.connecttimeoutmillis,
  readtimeoutmillis = client.readtimeoutmillis,
  writetimeoutmillis = client.writetimeoutmillis
 )

 var callednomoreexchanges = false
 try {
  //拦截链的执行
  val response = chain.proceed(originalrequest)
 	 ...
 } catch (e: ioexception) {
	 ...
 } finally {
	 ...
 }
 }

//1.realinterceptorchain#proceed(request: request)
@throws(ioexception::class)
 override fun proceed(request: request): response {
 ...
 // copy出新的拦截链,链中的拦截器集合index+1
 val next = copy(index = index + 1, request = request)
 val interceptor = interceptors[index]

 //调用拦截器的intercept(chain: chain): response 返回处理后的数据 交由下一个拦截器处理
 @suppress("useless_elvis")
 val response = interceptor.intercept(next) ?: throw nullpointerexception(
  "interceptor $interceptor returned null")
 ...
 //返回最终的响应体
 return response
 }

拦截器开始操作 request。

3.1 拦截器是怎么拦截的?

拦截器都继承自 interceptor 类并实现了 fun intercept(chain: chain): response 方法。
在 intercept 方法里传入 chain 对象 调用它的 proceed() 然后 proceed() 方法里又 copy 下一个拦截器,然后双调用了 intercept(chain: chain) 接着叒 chain.proceed(request) 直到最后一个拦截器 return response 然后一层一层向上反馈数据。

3.2 retryandfollowupinterceptor

这个拦截器是用来处理重定向的后续请求和失败重试,也就是说一般第一次发起请求不需要重定向会调用下一个拦截器。

@throws(ioexception::class)
 override fun intercept(chain: interceptor.chain): response {
 val realchain = chain as realinterceptorchain
 var request = chain.request
 val call = realchain.call
 var followupcount = 0
 var priorresponse: response? = null
 var newexchangefinder = true
 var recoveredfailures = listof<ioexception>()
 while (true) {
  ...//在调用下一个拦截器前的操作
  var response: response
  try {
  ...
  try {
   //调用下一个拦截器
   response = realchain.proceed(request)
   newexchangefinder = true
  } catch (e: routeexception) {
  ...
   continue
  } catch (e: ioexception) {
   ...
   continue
  }

  ...
  //处理上一个拦截器返回的 response
  val followup = followuprequest(response, exchange)
	 ...
	 //中间有一些判断是否需要重新请求 不需要则返回 response
	 //处理之后重新请求 request
  request = followup
  priorresponse = response
  } finally {
  call.exitnetworkinterceptorexchange(closeactiveexchange)
  }
 }
 }
 
 @throws(ioexception::class)
 private fun followuprequest(userresponse: response, exchange: exchange?): request? {
 val route = exchange?.connection?.route()
 val responsecode = userresponse.code

 val method = userresponse.request.method
 when (responsecode) {
	 //3xx 重定向
  http_perm_redirect, http_temp_redirect, http_mult_choice, http_moved_perm, http_moved_temp, http_see_other -> {
  //这个方法重新 构建了 request 用于重新请求
  return buildredirectrequest(userresponse, method)
  }
	 ... 省略一部分code
  else -> return null
 }
 }

在 followuprequest(userresponse: response, exchange: exchange?): request? 方法中判断了 response 中的服务器响应码做出了不同的操作。

3.3 bridgeinterceptor

它负责对于 http 的额外预处理,比如 content-length 的计算和添加、 gzip 的⽀持(accept-encoding: gzip)、 gzip 压缩数据的解包等,这个类比较简单就不贴代码了,想了解的话可以自行查看。

3.4 cacheinterceptor

这个类负责 cache 的处理,如果本地有了可⽤的 cache,⼀个请求可以在没有发⽣实质⽹络交互的情况下就返回缓存结果,实现如下。

 @throws(ioexception::class)
 override fun intercept(chain: interceptor.chain): response {
 //在cache(disklrucache)类中 通过request.url匹配response
 val cachecandidate = cache?.get(chain.request())
 //记录当前时间点
 val now = system.currenttimemillis()
 //缓存策略 有两种类型 
 //networkrequest 网络请求
 //cacheresponse 缓存的响应
 val strategy = cachestrategy.factory(now, chain.request(), cachecandidate).compute()
 val networkrequest = strategy.networkrequest
 val cacheresponse = strategy.cacheresponse
 //计算请求次数和缓存次数
 cache?.trackresponse(strategy)
 ...
 // 如果 禁止使用网络 并且 缓存不足,返回504和空body的response
 if (networkrequest == null && cacheresponse == null) {
  return response.builder()
   .request(chain.request())
   .protocol(protocol.http_1_1)
   .code(http_gateway_timeout)
   .message("unsatisfiable request (only-if-cached)")
   .body(empty_response)
   .sentrequestatmillis(-1l)
   .receivedresponseatmillis(system.currenttimemillis())
   .build()
 }

 // 如果策略中不能使用网络,就把缓存中的response封装返回
 if (networkrequest == null) {
  return cacheresponse!!.newbuilder()
   .cacheresponse(stripbody(cacheresponse))
   .build()
 }
 //调用拦截器process从网络获取数据
 var networkresponse: response? = null
 try {
  networkresponse = chain.proceed(networkrequest)
 } finally {
  // if we're crashing on i/o or otherwise, don't leak the cache body.
  if (networkresponse == null && cachecandidate != null) {
  cachecandidate.body?.closequietly()
  }
 }
 
 //如果有缓存的response
 if (cacheresponse != null) {
  //如果网络请求返回code为304 即说明资源未修改
  if (networkresponse?.code == http_not_modified) {
  //直接封装封装缓存的response返回即可
  val response = cacheresponse.newbuilder()
   .headers(combine(cacheresponse.headers, networkresponse.headers))
   .sentrequestatmillis(networkresponse.sentrequestatmillis)
   .receivedresponseatmillis(networkresponse.receivedresponseatmillis)
   .cacheresponse(stripbody(cacheresponse))
   .networkresponse(stripbody(networkresponse))
   .build()

  networkresponse.body!!.close()

  // update the cache after combining headers but before stripping the
  // content-encoding header (as performed by initcontentstream()).
  cache!!.trackconditionalcachehit()
  cache.update(cacheresponse, response)
  return response
  } else {
  cacheresponse.body?.closequietly()
  }
 }

 val response = networkresponse!!.newbuilder()
  .cacheresponse(stripbody(cacheresponse))
  .networkresponse(stripbody(networkresponse))
  .build()

 if (cache != null) {
  //判断是否具有主体 并且 是否可以缓存供后续使用
  if (response.promisesbody() && cachestrategy.iscacheable(response, networkrequest)) {
  // 加入缓存中
  val cacherequest = cache.put(response)
  return cachewritingresponse(cacherequest, response)
  }
  //如果请求方法无效 就从缓存中remove掉
  if (httpmethod.invalidatescache(networkrequest.method)) {
  try {
   cache.remove(networkrequest)
  } catch (_: ioexception) {
   // the cache cannot be written.
  }
  }
 }
 
 return response
 }

3.5 connectinterceptor

此类负责建⽴连接。 包含了⽹络请求所需要的 tcp 连接(http),或者 tcp 之前的 tls 连接(https),并且会创建出对应的 httpcodec 对象(⽤于编码解码 http 请求)。

@throws(ioexception::class)
 override fun intercept(chain: interceptor.chain): response {
 val realchain = chain as realinterceptorchain
 val exchange = realchain.call.initexchange(chain)
 val connectedchain = realchain.copy(exchange = exchange)
 return connectedchain.proceed(realchain.request)
 }

看似短短四行实际工作还是比较多的。

/** finds a new or pooled connection to carry a forthcoming request and response. */
 internal fun initexchange(chain: realinterceptorchain): exchange {
	...
	//codec是对 http 协议操作的抽象,有两个实现:http1codec和http2codec,对应 http/1.1 和 http/2。
 val codec = exchangefinder.find(client, chain)
 val result = exchange(this, eventlistener, exchangefinder, codec)
 ...
 return result
 }
 
 #exchangefinder.find
 fun find(client: okhttpclient,chain: realinterceptorchain):exchangecodec {
 try {
  //寻找一个可用的连接
  val resultconnection = findhealthyconnection(
   connecttimeout = chain.connecttimeoutmillis,
   readtimeout = chain.readtimeoutmillis,
   writetimeout = chain.writetimeoutmillis,
   pingintervalmillis = client.pingintervalmillis,
   connectionretryenabled = client.retryonconnectionfailure,
   doextensivehealthchecks = chain.request.method != "get"
  )
  return resultconnection.newcodec(client, chain)
 } catch (e: routeexception) {
  trackfailure(e.lastconnectexception)
  throw e
 } catch (e: ioexception) {
  trackfailure(e)
  throw routeexception(e)
 }
 }
 
 @throws(ioexception::class)
 private fun findhealthyconnection(
 connecttimeout: int,
 readtimeout: int,
 writetimeout: int,
 pingintervalmillis: int,
 connectionretryenabled: boolean,
 doextensivehealthchecks: boolean
 ): realconnection {
 while (true) {
 	//寻找连接
  val candidate = findconnection(
   connecttimeout = connecttimeout,
   readtimeout = readtimeout,
   writetimeout = writetimeout,
   pingintervalmillis = pingintervalmillis,
   connectionretryenabled = connectionretryenabled
  )

  //确认找到的连接可用并返回
  if (candidate.ishealthy(doextensivehealthchecks)) {
  return candidate
  }
	 ...
  throw ioexception("exhausted all routes")
 }
 }
 
 @throws(ioexception::class)
 private fun findconnection(
 connecttimeout: int,
 readtimeout: int,
 writetimeout: int,
 pingintervalmillis: int,
 connectionretryenabled: boolean
 ): realconnection {
 if (call.iscanceled()) throw ioexception("canceled")

 // 1. 尝试重用这个call的连接 比如重定向需要再次请求 那么这里就会重用之前的连接
 val callconnection = call.connection
 if (callconnection != null) {
  var toclose: socket? = null
  synchronized(callconnection) {
  if (callconnection.nonewexchanges || !samehostandport(callconnection.route().address.url)) {
   toclose = call.releaseconnectionnoevents()
  }
  }
	 //返回这个连接
  if (call.connection != null) {
  check(toclose == null)
  return callconnection
  }

  // the call's connection was released.
  toclose?.closequietly()
  eventlistener.connectionreleased(call, callconnection)
 }
	...
 // 2. 尝试从连接池中找一个连接 找到就返回连接
 if (connectionpool.callacquirepooledconnection(address, call, null, false)) {
  val result = call.connection!!
  eventlistener.connectionacquired(call, result)
  return result
 }

 // 3. 如果连接池中没有 计算出下一次要尝试的路由
 val routes: list<route>?
 val route: route
 if (nextroutetotry != null) {
  // use a route from a preceding coalesced connection.
  routes = null
  route = nextroutetotry!!
  nextroutetotry = null
 } else if (routeselection != null && routeselection!!.hasnext()) {
  // use a route from an existing route selection.
  routes = null
  route = routeselection!!.next()
 } else {
  // compute a new route selection. this is a blocking operation!
  var localrouteselector = routeselector
  if (localrouteselector == null) {
  localrouteselector = routeselector(address, call.client.routedatabase, call, eventlistener)
  this.routeselector = localrouteselector
  }
  val localrouteselection = localrouteselector.next()
  routeselection = localrouteselection
  routes = localrouteselection.routes

  if (call.iscanceled()) throw ioexception("canceled")

  // now that we have a set of ip addresses, make another attempt at getting a connection from
  // the pool. we have a better chance of matching thanks to connection coalescing.
  if (connectionpool.callacquirepooledconnection(address, call, routes, false)) {
  val result = call.connection!!
  eventlistener.connectionacquired(call, result)
  return result
  }

  route = localrouteselection.next()
 }

 // connect. tell the call about the connecting call so async cancels work.
 // 4.到这里还没有找到可用的连接 但是找到了 route 即路由 进行socket/tls连接
 val newconnection = realconnection(connectionpool, route)
 call.connectiontocancel = newconnection
 try {
  newconnection.connect(
   connecttimeout,
   readtimeout,
   writetimeout,
   pingintervalmillis,
   connectionretryenabled,
   call,
   eventlistener
  )
 } finally {
  call.connectiontocancel = null
 }
 call.client.routedatabase.connected(newconnection.route())

 // if we raced another call connecting to this host, coalesce the connections. this makes for 3
 // different lookups in the connection pool!
 // 4.查找是否有多路复用(http2)的连接,有就返回
 if (connectionpool.callacquirepooledconnection(address, call, routes, true)) {
  val result = call.connection!!
  nextroutetotry = route
  newconnection.socket().closequietly()
  eventlistener.connectionacquired(call, result)
  return result
 }

 synchronized(newconnection) {
  //放入连接池中
  connectionpool.put(newconnection)
  call.acquireconnectionnoevents(newconnection)
 }

 eventlistener.connectionacquired(call, newconnection)
 return newconnection
 }
 

接下来看看是如何建立连接的

fun connect(
 connecttimeout: int,
 readtimeout: int,
 writetimeout: int,
 pingintervalmillis: int,
 connectionretryenabled: boolean,
 call: call,
 eventlistener: eventlistener
 ) {
 ...
 while (true) {
  try {
  if (route.requirestunnel()) {
   //创建tunnel,用于通过http代理访问https
   //其中包含connectsocket、createtunnel
   connecttunnel(connecttimeout, readtimeout, writetimeout, call, eventlistener)
   if (rawsocket == null) {
   // we were unable to connect the tunnel but properly closed down our resources.
   break
   }
  } else {
   //不创建tunnel就创建socket连接 获取到数据流
   connectsocket(connecttimeout, readtimeout, call, eventlistener)
  }
  //建立协议连接tsl
  establishprotocol(connectionspecselector, pingintervalmillis, call, eventlistener)
  eventlistener.connectend(call, route.socketaddress, route.proxy, protocol)
  break
  } catch (e: ioexception) {
  ...
  }
 }
	...
 }

建立tsl连接

@throws(ioexception::class)
 private fun establishprotocol(
 connectionspecselector: connectionspecselector,
 pingintervalmillis: int,
 call: call,
 eventlistener: eventlistener
 ) {
 	//ssl为空 即http请求 明文请求
 if (route.address.sslsocketfactory == null) {
  if (protocol.h2_prior_knowledge in route.address.protocols) {
  socket = rawsocket
  protocol = protocol.h2_prior_knowledge
  starthttp2(pingintervalmillis)
  return
  }

  socket = rawsocket
  protocol = protocol.http_1_1
  return
 }
	//否则为https请求 需要连接sslsocket 验证证书是否可被服务器接受 保存tsl返回的信息
 eventlistener.secureconnectstart(call)
 connecttls(connectionspecselector)
 eventlistener.secureconnectend(call, handshake)

 if (protocol === protocol.http_2) {
  starthttp2(pingintervalmillis)
 }
 }

至此,创建好了连接,返回到最开始的 find() 方法返回 exchangecodec 对象,再包装为 exchange 对象用来下一个拦截器操作。

3.6 callserverinterceptor

这个类负责实质的请求与响应的 i/o 操作,即往 socket ⾥写⼊请求数据,和从 socket ⾥读取响应数据。

总结

用一张 @piasy 的图来做总结,图很干练结构也很清晰。

Android okhttp的启动流程及源码解析

以上就是android okhttp的启动流程及源码解析的详细内容,更多关于android okhttp的启动流程的资料请关注其它相关文章!