如何像RxJava1那样在RxJava2中使用Subject(AsyncSubject, BehaviorSubject...)?
How to use Subject (AsyncSubject, BehaviorSubject...) in RxJava2 like the way RxJava1 did?
在 Rxjava1 中,我们可以像这样订阅 Subject 到 Observable:
val asyncSubject = AsyncSubject<T>.create()
Observable<T>.subscribe(asyncSubject);
asyncSubject.subscribe(...)
不知道如何在 Rxjava2 Flowable 中实现同样的事情?
我正在考虑这样的事情
Flowable<T>
.doOnComplete { t -> asyncSubject.OnComplete() }
.subscribe { t -> asyncSubject.onNext(t) }
有没有better/more简洁的方法?
RxJava2 将内容分为 Flowable
和 Observable
。当你想要 back-pressure.
时使用 Flowables
由于这个 break-out,我们有两种不同类型的 Subject
对象,您的常规旧 BehaviorSubject
等与 Observable
一起使用,以及一个新的一组类称为Processors
,如BehaviorProcessor
等
这些 类 的工作方式与主题大致相同,但可用于 Flowables
而不是 Observables
。
在您的情况下,您可能希望使用 Processor
而不是主题:
val proc = AsyncProcessor.create<Int>()
val flowable = Flowable.just(1)
flowable.subscribe(proc)
在 Rxjava1 中,我们可以像这样订阅 Subject 到 Observable:
val asyncSubject = AsyncSubject<T>.create()
Observable<T>.subscribe(asyncSubject);
asyncSubject.subscribe(...)
不知道如何在 Rxjava2 Flowable 中实现同样的事情?
我正在考虑这样的事情
Flowable<T>
.doOnComplete { t -> asyncSubject.OnComplete() }
.subscribe { t -> asyncSubject.onNext(t) }
有没有better/more简洁的方法?
RxJava2 将内容分为 Flowable
和 Observable
。当你想要 back-pressure.
由于这个 break-out,我们有两种不同类型的 Subject
对象,您的常规旧 BehaviorSubject
等与 Observable
一起使用,以及一个新的一组类称为Processors
,如BehaviorProcessor
等
这些 类 的工作方式与主题大致相同,但可用于 Flowables
而不是 Observables
。
在您的情况下,您可能希望使用 Processor
而不是主题:
val proc = AsyncProcessor.create<Int>()
val flowable = Flowable.just(1)
flowable.subscribe(proc)