我可以从 RxJava 流中通知 BehaviorProcessor 吗?
Can I notify a BehaviorProcessor from inside a RxJava stream?
我希望得到您对以下代码的反馈。
我想知道打电话给 currentSession.onNext(result.session)
是否安全
来自 SessionManager.signIn
流。
我的第一直觉是 NO
因为多线程和同步问题,也就是说,根据这段代码,我可以从不同的线程调用 currentSession.onNext(result.session)
。
这是代码,请告诉我你的想法!谢谢
单例的SessionManager
@Singleton
class SessionManager @Inject constructor(
private val sessionService: SessionService,
){
val currentSession = BehaviorProcessor.create<Session>()
fun signIn(login: String, password: String): Single<Boolean> =
sessionService.signIn(login, password)
.doOnNext(result ->
if (session is Success) {
currentSession.onNext(result.session)
}
).map { result ->
when (result) {
is Success -> true
else -> false
}
}
.subscribeOn(Schedulers.io())
}
HomeView 是订阅 SessionManager 登录流的随机视图
class HomeView(val context: Context) : View(context) {
@Inject
lateinit var sessionManager: SessionManager
private val disposables = CompositeDisposable()
override fun onAttachedToWindow() {
super.onAttachedToWindow()
disposables.add(sessionManager.signIn("username", "password")
.distinctUntilChanged()
.observeOn(AndroidSchedulers.mainThread())
.subscribe { result ->
textView.text = if (result) "Success" else "Fail"
})
}
override fun onDetachedFromWindow() {
super.onDetachedFromWindow()
disposables.clear()
}
}
从 SessionManager
观察 currentSession
的随机视图
class RandomView(val context: Context) : View(context) {
@Inject
lateinit var sessionManager: SessionManager
private val disposables = CompositeDisposable()
override fun onAttachedToWindow() {
super.onAttachedToWindow()
disposables.add(sessionManager.currentSession
.distinctUntilChanged()
.observeOn(AndroidSchedulers.mainThread())
.subscribe { session -> userTextView.text = session.userName })
}
override fun onDetachedFromWindow() {
super.onDetachedFromWindow()
disposables.clear()
}
}
documentation of BehaviorProcessor 说:
Calling onNext(Object), offer(Object), onError(Throwable) and onComplete() is required to be serialized (called from the same thread or called non-overlappingly from different threads through external means of serialization). The FlowableProcessor.toSerialized() method available to all FlowableProcessors provides such serialization and also protects against reentrance (i.e., when a downstream Subscriber consuming this processor also wants to call onNext(Object) on this processor recursively).
所以如果你这样定义它:
val currentSession = BehaviorProcessor.create<Session>().toSerialized()
然后你可以安全地从任何线程调用onNext
,它不会导致任何同步问题。
备注:
我同意处理器的更新应该在 doOnNext
而不是 map
。
我认为最好使用 Completable
而不是 Single<Boolean>
,并使用 Rx 错误来指示阻止登录的原因。您还应该在 subscribe
方法。
我希望得到您对以下代码的反馈。
我想知道打电话给 currentSession.onNext(result.session)
是否安全
来自 SessionManager.signIn
流。
我的第一直觉是 NO
因为多线程和同步问题,也就是说,根据这段代码,我可以从不同的线程调用 currentSession.onNext(result.session)
。
这是代码,请告诉我你的想法!谢谢
单例的SessionManager
@Singleton
class SessionManager @Inject constructor(
private val sessionService: SessionService,
){
val currentSession = BehaviorProcessor.create<Session>()
fun signIn(login: String, password: String): Single<Boolean> =
sessionService.signIn(login, password)
.doOnNext(result ->
if (session is Success) {
currentSession.onNext(result.session)
}
).map { result ->
when (result) {
is Success -> true
else -> false
}
}
.subscribeOn(Schedulers.io())
}
HomeView 是订阅 SessionManager 登录流的随机视图
class HomeView(val context: Context) : View(context) {
@Inject
lateinit var sessionManager: SessionManager
private val disposables = CompositeDisposable()
override fun onAttachedToWindow() {
super.onAttachedToWindow()
disposables.add(sessionManager.signIn("username", "password")
.distinctUntilChanged()
.observeOn(AndroidSchedulers.mainThread())
.subscribe { result ->
textView.text = if (result) "Success" else "Fail"
})
}
override fun onDetachedFromWindow() {
super.onDetachedFromWindow()
disposables.clear()
}
}
从 SessionManager
currentSession
的随机视图
class RandomView(val context: Context) : View(context) {
@Inject
lateinit var sessionManager: SessionManager
private val disposables = CompositeDisposable()
override fun onAttachedToWindow() {
super.onAttachedToWindow()
disposables.add(sessionManager.currentSession
.distinctUntilChanged()
.observeOn(AndroidSchedulers.mainThread())
.subscribe { session -> userTextView.text = session.userName })
}
override fun onDetachedFromWindow() {
super.onDetachedFromWindow()
disposables.clear()
}
}
documentation of BehaviorProcessor 说:
Calling onNext(Object), offer(Object), onError(Throwable) and onComplete() is required to be serialized (called from the same thread or called non-overlappingly from different threads through external means of serialization). The FlowableProcessor.toSerialized() method available to all FlowableProcessors provides such serialization and also protects against reentrance (i.e., when a downstream Subscriber consuming this processor also wants to call onNext(Object) on this processor recursively).
所以如果你这样定义它:
val currentSession = BehaviorProcessor.create<Session>().toSerialized()
然后你可以安全地从任何线程调用onNext
,它不会导致任何同步问题。
备注:
我同意处理器的更新应该在 doOnNext
而不是 map
。
我认为最好使用 Completable
而不是 Single<Boolean>
,并使用 Rx 错误来指示阻止登录的原因。您还应该在 subscribe
方法。