谈谈RxJava2中的异常及处理方法

  • 2022-08-30 11:59:15

前言

众所周知,rxjava2 中当链式调用中抛出异常时,如果没有对应的 consumer 去处理异常,则这个异常会被抛出到虚拟机中去,android 上的直接表现就是 crash,程序崩溃。

订阅方式

说异常处理前咱们先来看一下 rxjava2 中 observable 订阅方法 subscribe() 我们常用的几种订阅方式:

// 1
subscribe()
// 2
disposable subscribe(consumer<? super t> onnext)
// 3
disposable subscribe(consumer<? super t> onnext, consumer<? super throwable> onerror)
// 4
disposable subscribe(consumer<? super t> onnext, consumer<? super throwable> onerror,action oncomplete)
// 5
disposable subscribe(consumer<? super t> onnext, consumer<? super throwable> onerror,action oncomplete, consumer<? super disposable> onsubscribe)
// 6
void subscribe(observer<? super t> observer)

无参和以 consumer为参数的几种方法内部都是以默认参数补齐的方式最终调用第 5 个方法,而方法 5 内部通过 lambdaobserver 将参数包装成 observer 再调用第 6 个方法

 public final disposable subscribe(consumer<? super t> onnext, consumer<? super throwable> onerror,
 action oncomplete, consumer<? super disposable> onsubscribe) {
 objecthelper.requirenonnull(onnext, "onnext is null");
 objecthelper.requirenonnull(onerror, "onerror is null");
 objecthelper.requirenonnull(oncomplete, "oncomplete is null");
 objecthelper.requirenonnull(onsubscribe, "onsubscribe is null");

 lambdaobserver<t> ls = new lambdaobserver<t>(onnext, onerror, oncomplete, onsubscribe);

 subscribe(ls);

 return ls;
 }

所以使用 consumer 参数方式和  observer 参数方式进行订阅除了观察回调来源不一样其他没有任何差别。但就是因为这种差别,在异常情况发生时的处理结果上也会产生差别

异常处理

我们分别进行一下几种方式模拟异常:

1、observer onnext 中抛出异常(切换线程)

 apiservice.newjsonkeydata()
  .doonsubscribe { t -> compositedisposable.add(t) }
  .compose(rxscheduler.sync()) // 封装的线程切换
  .subscribe(object : observer<list<zoodata>> {
  override fun oncomplete() {

  }

  override fun onsubscribe(d: disposable) {

  }

  override fun onnext(t: list<zoodata>) {
  throw runtimeexception("runtime exception")
  }

  override fun onerror(e: throwable) {
  log.d("error", e.message)
  }

  })

结果:不会触发 onerror,app 崩溃

2、observer onnext 中抛出异常(未切换线程)

  observable.create<string> {
   it.onnext("ssss")
   }
    .subscribe(object : observer<string> {
    override fun oncomplete() {

    }

    override fun onsubscribe(d: disposable) {

    }

    override fun onnext(t: string) {
     log.d("result::", t)
     throw runtimeexception("run llllll")
    }

    override fun onerror(e: throwable) {
     log.e("sss", "sss", e)
    }

    })

结果:会触发 onerror,app 未崩溃

3、observer map 操作符中抛出异常

  apiservice.newjsonkeydata()
   .doonsubscribe { t -> compositedisposable.add(t) }
   .map {
   throw runtimeexception("runtime exception")
   }
   .compose(rxscheduler.sync())
   .subscribe(object : observer<list<zoodata>> {
   override fun oncomplete() {

   }

   override fun onsubscribe(d: disposable) {

   }

   override fun onnext(t: list<zoodata>) {

   }

   override fun onerror(e: throwable) {
    log.d("error", e.message)
   }

   })

结果:会触发 observer 的 onerror,app 未崩溃

4、consumer onnext 中抛出异常

  apiservice.newjsonkeydata()
   .doonsubscribe { t -> compositedisposable.add(t) }
   .compose(rxscheduler.sync())
   .subscribe({
   throw runtimeexception("messsasassssssssssssssssssssssssssssssssssssss")
   }, {
   log.d("error", it.message)
   })

结果 a:有 errorconsumer 触发 errorconsumer,app 未崩溃

 apiservice.newjsonkeydata()
   .doonsubscribe { t -> compositedisposable.add(t) }
   .compose(rxscheduler.sync())
   .subscribe {
   throw runtimeexception("messsasassssssssssssssssssssssssssssssssssssss")
   }

结果 b:无 errorconsumer,app 崩溃

那么为什么会出现这些不同情况呢?我们从源码中去一探究竟。

consumer 订阅方式的崩溃与不崩溃

subscribe() 传入 consumer 类型参数最终在 observable 中会将传入的参数转换为 lambdaobserver 再调用 subscribe(lambdaobserver)进行订阅。展开  lambdaobserver:(主要看 onnext 和 onerror 方法中的处理)

		.
		.
		.
		 @override
 public void onnext(t t) {
 if (!isdisposed()) {
  try {
  onnext.accept(t);
  } catch (throwable e) {
  exceptions.throwiffatal(e);
  get().dispose();
  onerror(e);
  }
 }
 }

 @override
 public void onerror(throwable t) {
 if (!isdisposed()) {
  lazyset(disposablehelper.disposed);
  try {
  onerror.accept(t);
  } catch (throwable e) {
  exceptions.throwiffatal(e);
  rxjavaplugins.onerror(new compositeexception(t, e));
  }
 } else {
  rxjavaplugins.onerror(t);
 }
 }
		.
		.
		.

onnext 中调用了对应 consumer 的 apply() 方法,并且进行了 try catch。因此我们在 consumer 中进行的工作抛出异常会被捕获触发 lambdaobserver 的 onerror。再看 onerror 中,如果订阅未取消且 errorconsumer 的 apply() 执行无异常则能正常走完事件流,否则会调用 rxjavaplugins.onerror(t)。看到这里应该就能明白了,当订阅时未传入 errorconsumer时 observable 会指定 onerrormissingconsumer 为默认的 errorconsumer,发生异常时抛出 onerrornotimplementedexception。

rxjavaplugins.onerror(t)

上面分析,发现异常最终会流向 rxjavaplugins.onerror(t)。这个方法为 rxjava2 提供的一个全局的静态方法。

 public static void onerror(@nonnull throwable error) {
 consumer<? super throwable> f = errorhandler;

 if (error == null) {
  error = new nullpointerexception("onerror called with null. null values are generally not allowed in 2.x operators and sources.");
 } else {
  if (!isbug(error)) {
  error = new undeliverableexception(error);
  }
 }

 if (f != null) {
  try {
  f.accept(error);
  return;
  } catch (throwable e) {
  // exceptions.throwiffatal(e); todo decide
  e.printstacktrace(); // nopmd
  uncaught(e);
  }
 }

 error.printstacktrace(); // nopmd
 uncaught(error);
 }

查看其源码发现,当 errorhandler 不为空时异常将由其消耗掉,为空或者消耗过程产生新的异常则 rxjava 会将异常抛给虚拟机(可能导致程序崩溃)。 errorhandler本身是一个 consumer 对象,我们可以通过如下方式配置他:

 rxjavaplugins.seterrorhandler(object : consumer1<throwable> {
 override fun accept(t: throwable?) {
  todo("not implemented") //to change body of created functions use file | settings | file templates.
 }

 })

数据操作符中抛出异常

以 map 操作符为例,map 操作符实际上 rxjava 是将事件流 hook 了另一个新的 observable observablemap

 @checkreturnvalue
 @schedulersupport(schedulersupport.none)
 public final <r> observable<r> map(function<? super t, ? extends r> mapper) {
 objecthelper.requirenonnull(mapper, "mapper is null");
 return rxjavaplugins.onassembly(new observablemap<t, r>(this, mapper));
 }

进入 observablemap 类,发现内部订阅了一个内部静态类 mapobserver,重点看 mapobserver  的 onnext 方法

 public void onnext(t t) {
  if (done) {
  return;
  }

  if (sourcemode != none) {
  downstream.onnext(null);
  return;
  }

  u v;

  try {
  v = objecthelper.requirenonnull(mapper.apply(t), "the mapper function returned a null value.");
  } catch (throwable ex) {
  fail(ex);
  return;
  }
  downstream.onnext(v);
 }

onnext 中 try catch 了 mapper.apply(),这个 apply 执行的就是我们在操作符中实现的 function 方法。因此在 map 之类数据变换操作符中产生异常能够自身捕获并发送给最终的 observer。如果此时的订阅对象中能消耗掉异常则事件流正常走 onerror() 结束,如果订阅方式为上以节中的 consumer,则崩溃情况为上一节中的分析结果。

observer 的 onnext 中抛出异常

上述的方式 1 为一次网络请求,里面涉及到线程的切换。方式 2 为直接 create 一个 observable 对象,不涉及线程切换,其结果为线程切换后,观察者 observer 的 onnext() 方法中抛出异常无法触发 onerror(),程序崩溃。

未切换线程的 observable.create

查看 create() 方法源码,发现内部创建了一个 observablecreate 对象,在调用订阅时会触发 subscribeactual()  方法。在  subscribeactual() 中再调用我们 create 时传入的 observableonsubscribe 对象的 subscribe() 方法来触发事件流。

 @override
 protected void subscribeactual(observer<? super t> observer) {
	
		// 对我们的观察者使用 createemitter 进行包装,内部的触发方法是相对应的
 createemitter<t> parent = new createemitter<t>(observer);
 observer.onsubscribe(parent);

 try {
			// source 为 create 时创建的 observableonsubscribe 匿名内部接口实现类
  source.subscribe(parent);
 } catch (throwable ex) {
  exceptions.throwiffatal(ex);
  parent.onerror(ex);
 }
 }

上述代码中的订阅过程是使用 try catch 今夕包裹的。订阅及订阅触发后发送的事件流都在一个线程,所以能够捕获整个事件流中的异常。(ps : 大家可以尝试下使用  observeon() 切换事件发送线程。会发现异常不能再捕获,程序崩溃)

涉及线程变换时的异常处理

retrofit 进行网络请求返回的 observable 对象实质上是 rxjava2calladapter 中生成的 bodyobservable,期内部的 onnext 是没有进行异常捕获的。其实这里是否捕获并不是程序崩溃的根本原因,因为进行网络请求,必然是涉及到线程切换的。就算此处 try catch 处理了,也并不能捕获到事件流下游的异常。

 @override public void onnext(response<r> response) {
 if (response.issuccessful()) {
 observer.onnext(response.body());
 } else {
 terminated = true;
 throwable t = new httpexception(response);
 try {
  observer.onerror(t);
 } catch (throwable inner) {
  exceptions.throwiffatal(inner);
  rxjavaplugins.onerror(new compositeexception(t, inner));
 }
 }
 }

以我们在最终的 observer 的 onnext 抛出异常为例,要捕获这次异常那么必须在最终的调用线程中去进行捕获。即 .observeon(androidschedulers.mainthread()) 切换过来的 android 主线程。与其他操作符一样,线程切换时产生了一组新的订阅关系,rxjava 内部会创建一个新的观察对象 observableobserveon。

 @override
 public void onnext(t t) {
  if (done) {
  return;
  }

  if (sourcemode != queuedisposable.async) {
  queue.offer(t);
  }
  schedule();
 }
		.
		.
		.
		void schedule() {
  if (getandincrement() == 0) {
  worker.schedule(this); // 执行 observableobserveon 的 run 方法
  }
 }
		.
		.
		.
	 @override
 public void run() {
  if (outputfused) {
  drainfused();
  } else {
  drainnormal();
  }
 }
	

而执行任务的 worker 即为对应线程 scheduler 的对应实现子类所创建的 worker,以 androidschedulers.mainthread() 为例,scheduler 实现类为 handlerscheduler,其对应 worker 为 handlerworker,最终任务交给 scheduledrunnable 来执行。

 private static final class scheduledrunnable implements runnable, disposable {
 private final handler handler;
 private final runnable delegate;

 private volatile boolean disposed; // tracked solely for isdisposed().

 scheduledrunnable(handler handler, runnable delegate) {
  this.handler = handler;
  this.delegate = delegate;
 }

 @override
 public void run() {
  try {
  delegate.run();
  } catch (throwable t) {
  rxjavaplugins.onerror(t);
  }
 }

 @override
 public void dispose() {
  handler.removecallbacks(this);
  disposed = true;
 }

 @override
 public boolean isdisposed() {
  return disposed;
 }
 }

会发现,run 中 进行了 try catch。但 catch 内消化异常使用的是全局异常处理 rxjavaplugins.onerror(t);,而不是某一个观察者的 onerror。所以在经过切换线程操作符后,观察者 onnext 中抛出的异常,onerror 无法捕获。

处理方案

既然知道了问题所在,那么处理问题的方案也就十分清晰了。

1、注册全局的异常处理

 rxjavaplugins.seterrorhandler(object : consumer<throwable> {
  override fun accept(t: throwable?) {
  // do something 
  }

 })

2、consumer 作为观察者时,不完全确定没有异常一定要添加异常处理 consumer

 apiservice.stringdata()
   .doonsubscribe { t -> compositedisposable.add(t) }
   .compose(rxscheduler.sync())
   .subscribe(consumer<boolean>{ }, consumer<throwable> { })

3、observer 可以创建一个 baseobaerver 将 onnext 内部进行 try catch 人为的流转到 onerror 中,项目中的观察这都使用这个 baseobserver 的子类。

 @override
 public void onnext(t t) {
 try {
  onsuccess(t);
 } catch (exception e) {
  onerror(e);
 }
 data = t;
 success = true;
 }

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对的支持。

猜你喜欢