欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  移动技术

详解用RxJava实现事件总线(Event Bus)

程序员文章站 2023-11-06 22:07:22
目前大多数开发者使用eventbus或者otto作为事件总线通信库,对于rxjava使用者来说,rxjava也可以轻松实现事件总线,因为它们都依据于观察者模式。 不多说,...

目前大多数开发者使用eventbus或者otto作为事件总线通信库,对于rxjava使用者来说,rxjava也可以轻松实现事件总线,因为它们都依据于观察者模式。

不多说,上代码

/**
* rxbus
* created by yokeyword on 2015/6/17.
*/
public class rxbus {
  private static volatile rxbus defaultinstance;

  private final subject<object, object> bus;
  // publishsubject只会把在订阅发生的时间点之后来自原始observable的数据发射给观察者
  public rxbus() {
   bus = new serializedsubject<>(publishsubject.create());
  }
  // 单例rxbus
  public static rxbus getdefault() {
    if (defaultinstance == null) {
      synchronized (rxbus.class) {
        if (defaultinstance == null) {
          defaultinstance = new rxbus();
        }
      }
    }
    return defaultinstance ;
  }
  // 发送一个新的事件
  public void post (object o) {
    bus.onnext(o);
  }
  // 根据传递的 eventtype 类型返回特定类型(eventtype)的 被观察者
  public <t> observable<t> toobservable (class<t> eventtype) {
    return bus.oftype(eventtype);
//    这里感谢小鄧子的提醒: oftype = filter + cast
//    return bus.filter(new func1<object, boolean>() {
//      @override
//      public boolean call(object o) {
//        return eventtype.isinstance(o);
//      }
//    }) .cast(eventtype);
  }
}

注:

1、subject同时充当了observer和observable的角色,subject是非线程安全的,要避免该问题,需要将 subject转换为一个 serializedsubject ,上述rxbus类中把线程非安全的publishsubject包装成线程安全的subject。

2、publishsubject只会把在订阅发生的时间点之后来自原始observable的数据发射给观察者。

3、oftype操作符只发射指定类型的数据,其内部就是filter+cast(这里非常感谢@小鄧子  的提醒)

public final <r> observable<r> oftype(final class<r> klass) {
  return filter(new func1<t, boolean>() {
    @override
    public final boolean call(t t) {
      return klass.isinstance(t);
    }
  }).cast(klass);
}

filter操作符可以使你提供一个指定的测试数据项,只有通过测试的数据才会被“发射”。

cast操作符可以将一个observable转换成指定类型的observable。

分析:

详解用RxJava实现事件总线(Event Bus)

rxbus工作流程图

1、首先创建一个可同时充当observer和observable的subject;

2、在需要接收事件的地方,订阅该subject(此时subject是作为observable),在这之后,一旦subject接收到事件,立即发射给该订阅者;

3、在我们需要发送事件的地方,将事件post至subject,此时subject作为observer接收到事件(onnext),然后会发射给所有订阅该subject的订阅者。

对于rxbus的使用,就和普通的rxjava订阅事件很相似了。

先看发送事件的代码:

rxbus.getdefault().post(new userevent (1, "yoyo"));

userevent是要发送的事件,如果你用过eventbus, 很容易理解,userevent的代码:

public class userevent {
  long id;
  string name;
  public userevent(long id,string name) {
    this.id= id;
    this.name= name;
  }
  public long getid() {
    return id;
  }
  public string getname() {
    return name;
  }
}

再看接收事件的代码:

// rxsubscription是一个subscription的全局变量,这段代码可以在oncreate/onstart等生命周期内
rxsubscription = rxbus.getdefault().toobserverable(userevent.class)
    .subscribe(new action1<userevent>() {
        @override
        public void call(userevent userevent) {
          long id = userevent.getid();
          string name = userevent.getname();
          ...
        }
      },
    new action1<throwable>() {
      @override
      public void call(throwable throwable) {
        // todo: 处理异常
      }    
    });

最后,一定要记得在生命周期结束的地方取消订阅事件,防止rxjava可能会引起的内存泄漏问题。

@override
protected void ondestroy() {
  super.ondestroy();
  if(!rxsubscription.isunsubscribed()) {
    rxsubscription.unsubscribe();
  }
}

这样,一个简单的event bus就实现了!如果你的项目已经开始使用rxjava,也许可以考虑替换掉eventbus或otto,减小项目体积。

rxbus、eventbus因为解耦太彻底,滥用的话,项目可维护性会越来越低;一些简单场景更推荐用回调、subject来代替事件总线。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。