RxJava2 中具有多个订阅者的单个异步事件
A single async event with many subscribers in RxJava2
我有一个 class 可以处理图像,这可能是一个缓慢的过程。工作完成后,class 包含有关图像的一些特征,例如主色。
我还有很多其他代码想要知道主色,当他们要求时,它可能准备好也可能没有准备好。
我还没有找到使用 RxJava2 来实现它的简单方法。有人可以帮助我吗?
总而言之,如果我能创建一个方法,那就太好了:
- 允许多个订阅者 call/subscribe 到。
- 处理完成后,订阅者会收到结果。
- 订阅者会自动取消订阅以避免内存泄漏。不会有第二个活动,也没有理由继续订阅。
- 稍后 subscribe/call 方法仅获取缓存值的订阅者。
ReplaySubject 似乎有一些我正在寻找的属性,但我不确定如何正确实现它。
'1. Allows multiple subscribers to call/subscribe to.
'4. Subscribers which subscribe/call the method at a later point just gets the the cached value.
结合使用 replay(1)
和 autoConnect()
。这将导致一个 observable 共享对源的单个订阅,并重播源发出的最后一个值。 autoConnect()
确保第一个订阅者订阅时直接订阅源。
- When the processing is done, the subscribers receives the result.
使用 Observable.create()
并使用 ObservableEmitter
发出结果。
- The subscribers are automatically unsubscripted to avoid memory leaks. There will be no second event, and no reason to still be subscribed.
将 Observable
转换为 Single
。
以下内容应该有效:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(final ObservableEmitter<String> e) throws Exception {
Thread.sleep(5000);
e.onNext("Test");
e.onComplete();
}
}).replay(1).autoConnect()
.firstOrError();
请注意,您应该保留对此 Observable
的引用(firstOrError()
的结果)并与订阅者共享该实例。
我有一个 class 可以处理图像,这可能是一个缓慢的过程。工作完成后,class 包含有关图像的一些特征,例如主色。
我还有很多其他代码想要知道主色,当他们要求时,它可能准备好也可能没有准备好。
我还没有找到使用 RxJava2 来实现它的简单方法。有人可以帮助我吗?
总而言之,如果我能创建一个方法,那就太好了:
- 允许多个订阅者 call/subscribe 到。
- 处理完成后,订阅者会收到结果。
- 订阅者会自动取消订阅以避免内存泄漏。不会有第二个活动,也没有理由继续订阅。
- 稍后 subscribe/call 方法仅获取缓存值的订阅者。
ReplaySubject 似乎有一些我正在寻找的属性,但我不确定如何正确实现它。
'1. Allows multiple subscribers to call/subscribe to.
'4. Subscribers which subscribe/call the method at a later point just gets the the cached value.
结合使用 replay(1)
和 autoConnect()
。这将导致一个 observable 共享对源的单个订阅,并重播源发出的最后一个值。 autoConnect()
确保第一个订阅者订阅时直接订阅源。
- When the processing is done, the subscribers receives the result.
使用 Observable.create()
并使用 ObservableEmitter
发出结果。
- The subscribers are automatically unsubscripted to avoid memory leaks. There will be no second event, and no reason to still be subscribed.
将 Observable
转换为 Single
。
以下内容应该有效:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(final ObservableEmitter<String> e) throws Exception {
Thread.sleep(5000);
e.onNext("Test");
e.onComplete();
}
}).replay(1).autoConnect()
.firstOrError();
请注意,您应该保留对此 Observable
的引用(firstOrError()
的结果)并与订阅者共享该实例。