如何像RxJava1那样在RxJava2中使用Subject(AsyncSubject, BehaviorSubject...)?

How to use Subject (AsyncSubject, BehaviorSubject...) in RxJava2 like the way RxJava1 did?

在 Rxjava1 中,我们可以像这样订阅 Subject 到 Observable:

val asyncSubject = AsyncSubject<T>.create()
Observable<T>.subscribe(asyncSubject);
asyncSubject.subscribe(...)

不知道如何在 Rxjava2 Flowable 中实现同样的事情?

我正在考虑这样的事情

Flowable<T>
.doOnComplete { t -> asyncSubject.OnComplete() }
.subscribe { t -> asyncSubject.onNext(t) }

有没有better/more简洁的方法?

RxJava2 将内容分为 FlowableObservable。当你想要 back-pressure.

时使用 Flowables

由于这个 break-out,我们有两种不同类型的 Subject 对象,您的常规旧 BehaviorSubject 等与 Observable 一起使用,以及一个新的一组类称为Processors,如BehaviorProcessor

这些 类 的工作方式与主题大致相同,但可用于 Flowables 而不是 Observables

在您的情况下,您可能希望使用 Processor 而不是主题:

val proc = AsyncProcessor.create<Int>()
val flowable = Flowable.just(1)
flowable.subscribe(proc)