我可以从 RxJava 流中通知 BehaviorProcessor 吗?

Can I notify a BehaviorProcessor from inside a RxJava stream?

我希望得到您对以下代码的反馈。 我想知道打电话给 currentSession.onNext(result.session) 是否安全 来自 SessionManager.signIn 流。

我的第一直觉是 NO 因为多线程和同步问题,也就是说,根据这段代码,我可以从不同的线程调用 currentSession.onNext(result.session)

这是代码,请告诉我你的想法!谢谢

单例的SessionManager

@Singleton
class SessionManager @Inject constructor(
    private val sessionService: SessionService,
){

    val currentSession = BehaviorProcessor.create<Session>()

    fun signIn(login: String, password: String): Single<Boolean> =
        sessionService.signIn(login, password)
            .doOnNext(result -> 
                if (session is Success) {
                   currentSession.onNext(result.session)
                }
            ).map { result ->
                when (result) {
                    is Success -> true
                    else -> false
                }
            }
            .subscribeOn(Schedulers.io())
}

HomeView 是订阅 SessionManager 登录流的随机视图

class HomeView(val context: Context) : View(context) {

        @Inject
        lateinit var sessionManager: SessionManager

        private val disposables = CompositeDisposable()

        override fun onAttachedToWindow() {
            super.onAttachedToWindow()

            disposables.add(sessionManager.signIn("username", "password")
                .distinctUntilChanged()
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe { result ->
                    textView.text = if (result) "Success" else "Fail"
                })
        }

        override fun onDetachedFromWindow() {
            super.onDetachedFromWindow()
            disposables.clear()
        }
    }

SessionManager

观察 currentSession 的随机视图
class RandomView(val context: Context) : View(context) {

        @Inject
        lateinit var sessionManager: SessionManager

        private val disposables = CompositeDisposable()

        override fun onAttachedToWindow() {
            super.onAttachedToWindow()

            disposables.add(sessionManager.currentSession
                .distinctUntilChanged()
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe { session -> userTextView.text = session.userName })
        }

        override fun onDetachedFromWindow() {
            super.onDetachedFromWindow()
            disposables.clear()
        }
    }

documentation of BehaviorProcessor 说:

Calling onNext(Object), offer(Object), onError(Throwable) and onComplete() is required to be serialized (called from the same thread or called non-overlappingly from different threads through external means of serialization). The FlowableProcessor.toSerialized() method available to all FlowableProcessors provides such serialization and also protects against reentrance (i.e., when a downstream Subscriber consuming this processor also wants to call onNext(Object) on this processor recursively).

所以如果你这样定义它:

val currentSession = BehaviorProcessor.create<Session>().toSerialized()

然后你可以安全地从任何线程调用onNext,它不会导致任何同步问题。

备注:

我同意处理器的更新应该在 doOnNext 而不是 map

我认为最好使用 Completable 而不是 Single<Boolean>,并使用 Rx 错误来指示阻止登录的原因。您还应该在 subscribe 方法。