RxJava2 中具有多个订阅者的单个异步事件

A single async event with many subscribers in RxJava2

我有一个 class 可以处理图像,这可能是一个缓慢的过程。工作完成后,class 包含有关图像的一些特征,例如主色。

我还有很多其他代码想要知道主色,当他们要求时,它可能准备好也可能没有准备好。

我还没有找到使用 RxJava2 来实现它的简单方法。有人可以帮助我吗?

总而言之,如果我能创建一个方法,那就太好了:

  1. 允许多个订阅者 call/subscribe 到。
  2. 处理完成后,订阅者会收到结果。
  3. 订阅者会自动取消订阅以避免内存泄漏。不会有第二个活动,也没有理由继续订阅。
  4. 稍后 subscribe/call 方法仅获取缓存值的订阅者。

ReplaySubject 似乎有一些我正在寻找的属性,但我不确定如何正确实现它。

'1. Allows multiple subscribers to call/subscribe to.
'4. Subscribers which subscribe/call the method at a later point just gets the the cached value.

结合使用 replay(1)autoConnect()。这将导致一个 observable 共享对源的单个订阅,并重播源发出的最后一个值。 autoConnect() 确保第一个订阅者订阅时直接订阅源。

  1. When the processing is done, the subscribers receives the result.

使用 Observable.create() 并使用 ObservableEmitter 发出结果。

  1. The subscribers are automatically unsubscripted to avoid memory leaks. There will be no second event, and no reason to still be subscribed.

Observable 转换为 Single


以下内容应该有效:

Observable.create(new ObservableOnSubscribe<String>() {
  @Override
  public void subscribe(final ObservableEmitter<String> e) throws Exception {
    Thread.sleep(5000);
    e.onNext("Test");
    e.onComplete();
  }
}).replay(1).autoConnect()
    .firstOrError();

请注意,您应该保留对此 Observable 的引用(firstOrError() 的结果)并与订阅者共享该实例。