Android okhttp的启动流程及源码解析
前言
这篇文章主要讲解了okhttp的主要工作流程以及源码的解析。
什么是okhttp
简单来说 okhttp 就是一个客户端用来发送 http 消息并对服务器的响应做出处理的应用层框架。 那么它有什么优点呢?
- 易使用、易扩展。
- 支持 http/2 协议,允许对同一主机的所有请求共用同一个 socket 连接。
- 如果 http/2 不可用, 使用连接池复用减少请求延迟。
- 支持 gzip,减小了下载大小。
- 支持缓存处理,可以避免重复请求。
- 如果你的服务有多个 ip 地址,当第一次连接失败,okhttp 会尝试备用地址。
- okhttp 还处理了代理服务器问题和ssl握手失败问题。
okhttp是如何做网络请求的
1.它是如何使用的?
1.1 通过构造者模式添加 url,method,header,body 等完成一个请求的信息 request 对象
val request = request.builder() .url("") .addheader("","") .get() .build()
1.2 同样通过构造者模式创建一个 okhttpclicent 实例,可以按需配置
val okhttpclient = okhttpclient.builder() .connecttimeout(15, timeunit.seconds) .readtimeout(15, timeunit.seconds) .addinterceptor() .build()
1.3 创建 call 并且发起网络请求
val newcall = okhttpclient.newcall(request) //异步请求数据 newcall.enqueue(object :callback{ override fun onfailure(call: call, e: ioexception) {} override fun onresponse(call: call, response: response) {} }) //同步请求数据 val response = newcall.execute()
整个使用流程很简单,主要的地方在于如何通过 call 对象发起同/异步请求,后续的源码追踪以方法开始。
2.如何通过 call 发起请求?
2.1 call 是什么
/** prepares the [request] to be executed at some point in the future. */ override fun newcall(request: request): call = realcall(this, request, forwebsocket = false)
2.2 发起请求-异步请求
//realcall#enqueue(responsecallback: callback) override fun enqueue(responsecallback: callback) { synchronized(this) { //检查这个call是否执行过,每个 call 只能被执行一次 check(!executed) { "already executed" } executed = true } //此方法调用了eventlistener#callstart(call: call), 主要是用来监视应用程序的http调用的数量,大小和各个阶段的耗时 callstart() //创建asynccall,实际是个runnable client.dispatcher.enqueue(asynccall(responsecallback)) }
enqueue 最后一个方法分为两步
- 第一步将响应的回调放入 asynccall 对象中 ,asynccall 对象是 realcall 的一个内部类实现了 runnable 接口。
- 第二步通过 dispatcher 类的 enqueue() 将 asynccall 对象传入
//dispatcher#enqueue(call: asynccall) /** ready async calls in the order they'll be run. */ private val readyasynccalls = arraydeque<asynccall>() internal fun enqueue(call: asynccall) { synchronized(this) { //将call添加到即将运行的异步队列 readyasynccalls.add(call) ... promoteandexecute() } //dispatcher#promoteandexecute() //将[readyasynccalls]过渡到[runningasynccalls] private fun promoteandexecute(): boolean { ... for (i in 0 until executablecalls.size) { val asynccall = executablecalls[i] //这里就是通过 executorservice 执行 run() asynccall.executeon(executorservice) } return isrunning } //realcall.kt中的内部类 internal inner class asynccall( private val responsecallback: callback ) : runnable { fun executeon(executorservice: executorservice) { ... //执行runnable executorservice.execute(this) ... } override fun run() { threadname("okhttp ${redactedurl()}") { ... try { //兜兜转转 终于调用这个关键方法了 val response = getresponsewithinterceptorchain() signalledcallback = true //通过之前传入的接口回调数据 responsecallback.onresponse(this@realcall, response) } catch (e: ioexception) { if (signalledcallback) { platform.get().log("callback failure for ${tologgablestring()}", platform.info, e) } else { responsecallback.onfailure(this@realcall, e) } } catch (t: throwable) { cancel() if (!signalledcallback) { val canceledexception = ioexception("canceled due to $t") canceledexception.addsuppressed(t) responsecallback.onfailure(this@realcall, canceledexception) } throw t } finally { //移除队列 client.dispatcher.finished(this) } } } }
2.3 同步请求 realcall#execute()
override fun execute(): response { //同样判断是否执行过 synchronized(this) { check(!executed) { "already executed" } executed = true } timeout.enter() //同样监听 callstart() try { //同样执行 client.dispatcher.executed(this) return getresponsewithinterceptorchain() } finally { //同样移除 client.dispatcher.finished(this) } }
3.如何通过拦截器处理请求和响应?
无论同异步请求都会调用到 getresponsewithinterceptorchain() ,这个方法主要使用责任链模式将整个请求分为几个拦截器调用 ,简化了各自的责任和逻辑,可以扩展其它拦截器,看懂了拦截器 okhttp 就了解的差不多了。
@throws(ioexception::class) internal fun getresponsewithinterceptorchain(): response { // 构建完整的拦截器 val interceptors = mutablelistof<interceptor>() interceptors += client.interceptors //用户自己拦截器,数据最开始和最后 interceptors += retryandfollowupinterceptor(client) //失败后的重试和重定向 interceptors += bridgeinterceptor(client.cookiejar) //桥接用户的信息和服务器的信息 interceptors += cacheinterceptor(client.cache) //处理缓存相关 interceptors += connectinterceptor //负责与服务器连接 if (!forwebsocket) { interceptors += client.networkinterceptors //配置 okhttpclient 时设置,数据未经处理 } interceptors += callserverinterceptor(forwebsocket) //负责向服务器发送请求数据、从服务器读取响应数据 //创建拦截链 val chain = realinterceptorchain( call = this, interceptors = interceptors, index = 0, exchange = null, request = originalrequest, connecttimeoutmillis = client.connecttimeoutmillis, readtimeoutmillis = client.readtimeoutmillis, writetimeoutmillis = client.writetimeoutmillis ) var callednomoreexchanges = false try { //拦截链的执行 val response = chain.proceed(originalrequest) ... } catch (e: ioexception) { ... } finally { ... } } //1.realinterceptorchain#proceed(request: request) @throws(ioexception::class) override fun proceed(request: request): response { ... // copy出新的拦截链,链中的拦截器集合index+1 val next = copy(index = index + 1, request = request) val interceptor = interceptors[index] //调用拦截器的intercept(chain: chain): response 返回处理后的数据 交由下一个拦截器处理 @suppress("useless_elvis") val response = interceptor.intercept(next) ?: throw nullpointerexception( "interceptor $interceptor returned null") ... //返回最终的响应体 return response }
拦截器开始操作 request。
3.1 拦截器是怎么拦截的?
拦截器都继承自 interceptor 类并实现了 fun intercept(chain: chain): response 方法。
在 intercept 方法里传入 chain 对象 调用它的 proceed() 然后 proceed() 方法里又 copy 下一个拦截器,然后双调用了 intercept(chain: chain) 接着叒 chain.proceed(request) 直到最后一个拦截器 return response 然后一层一层向上反馈数据。
3.2 retryandfollowupinterceptor
这个拦截器是用来处理重定向的后续请求和失败重试,也就是说一般第一次发起请求不需要重定向会调用下一个拦截器。
@throws(ioexception::class) override fun intercept(chain: interceptor.chain): response { val realchain = chain as realinterceptorchain var request = chain.request val call = realchain.call var followupcount = 0 var priorresponse: response? = null var newexchangefinder = true var recoveredfailures = listof<ioexception>() while (true) { ...//在调用下一个拦截器前的操作 var response: response try { ... try { //调用下一个拦截器 response = realchain.proceed(request) newexchangefinder = true } catch (e: routeexception) { ... continue } catch (e: ioexception) { ... continue } ... //处理上一个拦截器返回的 response val followup = followuprequest(response, exchange) ... //中间有一些判断是否需要重新请求 不需要则返回 response //处理之后重新请求 request request = followup priorresponse = response } finally { call.exitnetworkinterceptorexchange(closeactiveexchange) } } } @throws(ioexception::class) private fun followuprequest(userresponse: response, exchange: exchange?): request? { val route = exchange?.connection?.route() val responsecode = userresponse.code val method = userresponse.request.method when (responsecode) { //3xx 重定向 http_perm_redirect, http_temp_redirect, http_mult_choice, http_moved_perm, http_moved_temp, http_see_other -> { //这个方法重新 构建了 request 用于重新请求 return buildredirectrequest(userresponse, method) } ... 省略一部分code else -> return null } }
在 followuprequest(userresponse: response, exchange: exchange?): request? 方法中判断了 response 中的服务器响应码做出了不同的操作。
3.3 bridgeinterceptor
它负责对于 http 的额外预处理,比如 content-length 的计算和添加、 gzip 的⽀持(accept-encoding: gzip)、 gzip 压缩数据的解包等,这个类比较简单就不贴代码了,想了解的话可以自行查看。
3.4 cacheinterceptor
这个类负责 cache 的处理,如果本地有了可⽤的 cache,⼀个请求可以在没有发⽣实质⽹络交互的情况下就返回缓存结果,实现如下。
@throws(ioexception::class) override fun intercept(chain: interceptor.chain): response { //在cache(disklrucache)类中 通过request.url匹配response val cachecandidate = cache?.get(chain.request()) //记录当前时间点 val now = system.currenttimemillis() //缓存策略 有两种类型 //networkrequest 网络请求 //cacheresponse 缓存的响应 val strategy = cachestrategy.factory(now, chain.request(), cachecandidate).compute() val networkrequest = strategy.networkrequest val cacheresponse = strategy.cacheresponse //计算请求次数和缓存次数 cache?.trackresponse(strategy) ... // 如果 禁止使用网络 并且 缓存不足,返回504和空body的response if (networkrequest == null && cacheresponse == null) { return response.builder() .request(chain.request()) .protocol(protocol.http_1_1) .code(http_gateway_timeout) .message("unsatisfiable request (only-if-cached)") .body(empty_response) .sentrequestatmillis(-1l) .receivedresponseatmillis(system.currenttimemillis()) .build() } // 如果策略中不能使用网络,就把缓存中的response封装返回 if (networkrequest == null) { return cacheresponse!!.newbuilder() .cacheresponse(stripbody(cacheresponse)) .build() } //调用拦截器process从网络获取数据 var networkresponse: response? = null try { networkresponse = chain.proceed(networkrequest) } finally { // if we're crashing on i/o or otherwise, don't leak the cache body. if (networkresponse == null && cachecandidate != null) { cachecandidate.body?.closequietly() } } //如果有缓存的response if (cacheresponse != null) { //如果网络请求返回code为304 即说明资源未修改 if (networkresponse?.code == http_not_modified) { //直接封装封装缓存的response返回即可 val response = cacheresponse.newbuilder() .headers(combine(cacheresponse.headers, networkresponse.headers)) .sentrequestatmillis(networkresponse.sentrequestatmillis) .receivedresponseatmillis(networkresponse.receivedresponseatmillis) .cacheresponse(stripbody(cacheresponse)) .networkresponse(stripbody(networkresponse)) .build() networkresponse.body!!.close() // update the cache after combining headers but before stripping the // content-encoding header (as performed by initcontentstream()). cache!!.trackconditionalcachehit() cache.update(cacheresponse, response) return response } else { cacheresponse.body?.closequietly() } } val response = networkresponse!!.newbuilder() .cacheresponse(stripbody(cacheresponse)) .networkresponse(stripbody(networkresponse)) .build() if (cache != null) { //判断是否具有主体 并且 是否可以缓存供后续使用 if (response.promisesbody() && cachestrategy.iscacheable(response, networkrequest)) { // 加入缓存中 val cacherequest = cache.put(response) return cachewritingresponse(cacherequest, response) } //如果请求方法无效 就从缓存中remove掉 if (httpmethod.invalidatescache(networkrequest.method)) { try { cache.remove(networkrequest) } catch (_: ioexception) { // the cache cannot be written. } } } return response }
3.5 connectinterceptor
此类负责建⽴连接。 包含了⽹络请求所需要的 tcp 连接(http),或者 tcp 之前的 tls 连接(https),并且会创建出对应的 httpcodec 对象(⽤于编码解码 http 请求)。
@throws(ioexception::class) override fun intercept(chain: interceptor.chain): response { val realchain = chain as realinterceptorchain val exchange = realchain.call.initexchange(chain) val connectedchain = realchain.copy(exchange = exchange) return connectedchain.proceed(realchain.request) }
看似短短四行实际工作还是比较多的。
/** finds a new or pooled connection to carry a forthcoming request and response. */ internal fun initexchange(chain: realinterceptorchain): exchange { ... //codec是对 http 协议操作的抽象,有两个实现:http1codec和http2codec,对应 http/1.1 和 http/2。 val codec = exchangefinder.find(client, chain) val result = exchange(this, eventlistener, exchangefinder, codec) ... return result } #exchangefinder.find fun find(client: okhttpclient,chain: realinterceptorchain):exchangecodec { try { //寻找一个可用的连接 val resultconnection = findhealthyconnection( connecttimeout = chain.connecttimeoutmillis, readtimeout = chain.readtimeoutmillis, writetimeout = chain.writetimeoutmillis, pingintervalmillis = client.pingintervalmillis, connectionretryenabled = client.retryonconnectionfailure, doextensivehealthchecks = chain.request.method != "get" ) return resultconnection.newcodec(client, chain) } catch (e: routeexception) { trackfailure(e.lastconnectexception) throw e } catch (e: ioexception) { trackfailure(e) throw routeexception(e) } } @throws(ioexception::class) private fun findhealthyconnection( connecttimeout: int, readtimeout: int, writetimeout: int, pingintervalmillis: int, connectionretryenabled: boolean, doextensivehealthchecks: boolean ): realconnection { while (true) { //寻找连接 val candidate = findconnection( connecttimeout = connecttimeout, readtimeout = readtimeout, writetimeout = writetimeout, pingintervalmillis = pingintervalmillis, connectionretryenabled = connectionretryenabled ) //确认找到的连接可用并返回 if (candidate.ishealthy(doextensivehealthchecks)) { return candidate } ... throw ioexception("exhausted all routes") } } @throws(ioexception::class) private fun findconnection( connecttimeout: int, readtimeout: int, writetimeout: int, pingintervalmillis: int, connectionretryenabled: boolean ): realconnection { if (call.iscanceled()) throw ioexception("canceled") // 1. 尝试重用这个call的连接 比如重定向需要再次请求 那么这里就会重用之前的连接 val callconnection = call.connection if (callconnection != null) { var toclose: socket? = null synchronized(callconnection) { if (callconnection.nonewexchanges || !samehostandport(callconnection.route().address.url)) { toclose = call.releaseconnectionnoevents() } } //返回这个连接 if (call.connection != null) { check(toclose == null) return callconnection } // the call's connection was released. toclose?.closequietly() eventlistener.connectionreleased(call, callconnection) } ... // 2. 尝试从连接池中找一个连接 找到就返回连接 if (connectionpool.callacquirepooledconnection(address, call, null, false)) { val result = call.connection!! eventlistener.connectionacquired(call, result) return result } // 3. 如果连接池中没有 计算出下一次要尝试的路由 val routes: list<route>? val route: route if (nextroutetotry != null) { // use a route from a preceding coalesced connection. routes = null route = nextroutetotry!! nextroutetotry = null } else if (routeselection != null && routeselection!!.hasnext()) { // use a route from an existing route selection. routes = null route = routeselection!!.next() } else { // compute a new route selection. this is a blocking operation! var localrouteselector = routeselector if (localrouteselector == null) { localrouteselector = routeselector(address, call.client.routedatabase, call, eventlistener) this.routeselector = localrouteselector } val localrouteselection = localrouteselector.next() routeselection = localrouteselection routes = localrouteselection.routes if (call.iscanceled()) throw ioexception("canceled") // now that we have a set of ip addresses, make another attempt at getting a connection from // the pool. we have a better chance of matching thanks to connection coalescing. if (connectionpool.callacquirepooledconnection(address, call, routes, false)) { val result = call.connection!! eventlistener.connectionacquired(call, result) return result } route = localrouteselection.next() } // connect. tell the call about the connecting call so async cancels work. // 4.到这里还没有找到可用的连接 但是找到了 route 即路由 进行socket/tls连接 val newconnection = realconnection(connectionpool, route) call.connectiontocancel = newconnection try { newconnection.connect( connecttimeout, readtimeout, writetimeout, pingintervalmillis, connectionretryenabled, call, eventlistener ) } finally { call.connectiontocancel = null } call.client.routedatabase.connected(newconnection.route()) // if we raced another call connecting to this host, coalesce the connections. this makes for 3 // different lookups in the connection pool! // 4.查找是否有多路复用(http2)的连接,有就返回 if (connectionpool.callacquirepooledconnection(address, call, routes, true)) { val result = call.connection!! nextroutetotry = route newconnection.socket().closequietly() eventlistener.connectionacquired(call, result) return result } synchronized(newconnection) { //放入连接池中 connectionpool.put(newconnection) call.acquireconnectionnoevents(newconnection) } eventlistener.connectionacquired(call, newconnection) return newconnection }
接下来看看是如何建立连接的
fun connect( connecttimeout: int, readtimeout: int, writetimeout: int, pingintervalmillis: int, connectionretryenabled: boolean, call: call, eventlistener: eventlistener ) { ... while (true) { try { if (route.requirestunnel()) { //创建tunnel,用于通过http代理访问https //其中包含connectsocket、createtunnel connecttunnel(connecttimeout, readtimeout, writetimeout, call, eventlistener) if (rawsocket == null) { // we were unable to connect the tunnel but properly closed down our resources. break } } else { //不创建tunnel就创建socket连接 获取到数据流 connectsocket(connecttimeout, readtimeout, call, eventlistener) } //建立协议连接tsl establishprotocol(connectionspecselector, pingintervalmillis, call, eventlistener) eventlistener.connectend(call, route.socketaddress, route.proxy, protocol) break } catch (e: ioexception) { ... } } ... }
建立tsl连接
@throws(ioexception::class) private fun establishprotocol( connectionspecselector: connectionspecselector, pingintervalmillis: int, call: call, eventlistener: eventlistener ) { //ssl为空 即http请求 明文请求 if (route.address.sslsocketfactory == null) { if (protocol.h2_prior_knowledge in route.address.protocols) { socket = rawsocket protocol = protocol.h2_prior_knowledge starthttp2(pingintervalmillis) return } socket = rawsocket protocol = protocol.http_1_1 return } //否则为https请求 需要连接sslsocket 验证证书是否可被服务器接受 保存tsl返回的信息 eventlistener.secureconnectstart(call) connecttls(connectionspecselector) eventlistener.secureconnectend(call, handshake) if (protocol === protocol.http_2) { starthttp2(pingintervalmillis) } }
至此,创建好了连接,返回到最开始的 find() 方法返回 exchangecodec 对象,再包装为 exchange 对象用来下一个拦截器操作。
3.6 callserverinterceptor
这个类负责实质的请求与响应的 i/o 操作,即往 socket ⾥写⼊请求数据,和从 socket ⾥读取响应数据。
总结
用一张 @piasy 的图来做总结,图很干练结构也很清晰。
以上就是android okhttp的启动流程及源码解析的详细内容,更多关于android okhttp的启动流程的资料请关注其它相关文章!
上一篇: 西湖醋鱼可以用鲈鱼吗
下一篇: 广州踏青的地方有哪些 广州踏青郊游好去处
推荐阅读
-
策略模式及Android源码中的应用解析
-
Android Service的启动流程源码分析
-
ROS源码研究(二):命令roscore执行流程及添加开机自动启动的节点(类似/rosout)
-
Android okhttp的启动流程及源码解析
-
Tomcat的类加载机制流程及源码解析
-
带你一步步剖析Retrofit 源码解析:一款基于 OkHttp 实现的网络请求框架 android源码解析网络请求OKHTTPRetrofit
-
策略模式及Android源码中的应用解析
-
Android Service的启动流程源码分析
-
Android APP及ActivityThread启动流程是如何的
-
Android okhttp的启动流程及源码解析