RxJava Subject 在不正确的调度程序上发出
RxJava Subject emits on incorrect scheduler
我有以下 class 单身人士:
public class SessionStore {
Subject<Session, Session> subject;
public SessionStore() {
subject = new SerializedSubject<>(BehaviorSubject.create(new Session());
}
public void set(Session session) {
subject.onNext(session);
}
public Observable<UserSession> observe() {
return subject.distinctUntilChanged();
}
}
在activity中,我观察会话并对每次更改执行网络操作:
private Subscription init() {
return sessionStore
.observe()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap(new Func1<Session, Observable<Object>>() {
@Override
public Observable<Object> call(Session session) {
return retrofitService.getAThing();
}
})
.subscribe(...);
}
当我订阅会话存储时,主题立即在 io()
上发出,因为它是 BehaviourSubject
并且订阅者在 mainThread()
上执行。
当我在已经订阅的情况下调用 sessionStore.set(new AnotherSession())
时,问题就出现了。 IMO 这应该在 io()
调度程序上执行 init()
中定义的流。然而,相反发生的是流在调用 subject.onNext()
的同一线程上执行。结果 NetworkOnMainThreadException
因为我在 flatMap()
.
中进行网络操作
我是不是理解错了?我会这样滥用它们吗?那么请问正确的解决方法是什么?
我也尝试在 observe()
方法中用 Observable.fromEmitter()
替换整个主题方法,但令人惊讶的是输出是完全相同的。
当您调用操作员时,它会影响整个下游。如果您致电:
.observeOn(AndroidSchedulers.mainThread())
在错误的地方,流的其余部分在指定的线程上执行。
我建议你总是添加:
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
在流的最后:
private Subscription init() {
return sessionStore
.observe()
.flatMap(new Func1<Session, Observable<Object>>() {
@Override
public Observable<Object> call(Session session) {
return retrofitService.getAThing();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(...);
}
我想你忘记了你的 Subject
也是一个观察者所以为了在 io 线程上获得 onNext
到 运行 试试
public class SessionStore {
Subject<Session, Session> subject;
public UserSessionStore() {
subject = new SerializedSubject<>(BehaviorSubject.create(new Session())).observeOn(Schedulers.io());
}
public void set(Session session) {
subject.onNext(session);
}
public Observable<UserSession> observe() {
return subject.distinctUntilChanged();
}
}
请看书'Reactive Programming with RxJava'
中的以下部分
默认情况下,在 Subject 上调用 onNext() 会直接传播到所有 Observer 的 onNext() 回调方法。这些方法共享相同的名称也就不足为奇了。在某种程度上,调用 Subject 上的 onNext() 会间接调用每个 Subscriber 上的 onNext()。
让我们回顾一下:
如果您在 Thread-1 的 Subject 上调用 onNext,它会从 Thread-1 调用 onNext 给订阅者。 onSubscribe 将被丢弃。
首先要做的是:
订阅将在哪个线程上进行:
retrofitService.getAThing()
我只是猜测,并说它是调用线程。这将是 observeOn 中描述的线程,即 Android-UI-Loop.
observeOn 下的每个值都将按照调度程序的指定从 Thread-a 转移到 Thread-b。 UI-Loop 上的 observeOn 应该在订阅之前发生。订阅中收到的每个值都将在 UI-Loop 上,这不会阻塞 UI 线程或以异常结束。
请看一下示例代码和输出:
class SessionStore {
private Subject<String, String> subject;
public SessionStore() {
subject = BehaviorSubject.create("wurst").toSerialized();
}
public void set(String session) {
subject.onNext(session);
}
public Observable<String> observe() {
return subject
.asObservable()
.doOnNext(s -> System.out.println("Receiving value on Thread:: " + Thread.currentThread()))
.distinctUntilChanged();
}
}
@Test
public void name() throws Exception {
// init
SessionStore sessionStore = new SessionStore();
TestSubscriber testSubscriber = new TestSubscriber();
Subscription subscribe = sessionStore
.observe()
.flatMap(s -> {
return Observable.fromCallable(() -> {
System.out.println("flatMap Thread:: " + Thread.currentThread());
return s;
}).subscribeOn(Schedulers.io());
})
.doOnNext(s -> System.out.println("After flatMap Thread:: " + Thread.currentThread()))
.observeOn(Schedulers.newThread()) // imagine AndroidScheduler here
.subscribe(testSubscriber); // Do UI-Stuff in subscribe
new Thread(() -> {
System.out.println("set on Thread:: " + Thread.currentThread());
sessionStore.set("123");
}).start();
new Thread(() -> {
System.out.println("set on Thread:: " + Thread.currentThread());
sessionStore.set("345");
}).start();
boolean b = testSubscriber.awaitValueCount(3, 3_000, TimeUnit.MILLISECONDS);
Assert.assertTrue(b);
}
输出::
Receiving value on Thread:: Thread[main,5,main]
flatMap Thread:: Thread[RxIoScheduler-2,5,main]
After flatMap Thread:: Thread[RxIoScheduler-2,5,main]
set on Thread:: Thread[Thread-1,5,main]
set on Thread:: Thread[Thread-0,5,main]
Receiving value on Thread:: Thread[Thread-1,5,main]
flatMap Thread:: Thread[RxIoScheduler-2,5,main]
After flatMap Thread:: Thread[RxIoScheduler-2,5,main]
Receiving value on Thread:: Thread[Thread-1,5,main]
flatMap Thread:: Thread[RxIoScheduler-2,5,main]
After flatMap Thread:: Thread[RxIoScheduler-2,5,main]
我有以下 class 单身人士:
public class SessionStore {
Subject<Session, Session> subject;
public SessionStore() {
subject = new SerializedSubject<>(BehaviorSubject.create(new Session());
}
public void set(Session session) {
subject.onNext(session);
}
public Observable<UserSession> observe() {
return subject.distinctUntilChanged();
}
}
在activity中,我观察会话并对每次更改执行网络操作:
private Subscription init() {
return sessionStore
.observe()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap(new Func1<Session, Observable<Object>>() {
@Override
public Observable<Object> call(Session session) {
return retrofitService.getAThing();
}
})
.subscribe(...);
}
当我订阅会话存储时,主题立即在 io()
上发出,因为它是 BehaviourSubject
并且订阅者在 mainThread()
上执行。
当我在已经订阅的情况下调用 sessionStore.set(new AnotherSession())
时,问题就出现了。 IMO 这应该在 io()
调度程序上执行 init()
中定义的流。然而,相反发生的是流在调用 subject.onNext()
的同一线程上执行。结果 NetworkOnMainThreadException
因为我在 flatMap()
.
我是不是理解错了?我会这样滥用它们吗?那么请问正确的解决方法是什么?
我也尝试在 observe()
方法中用 Observable.fromEmitter()
替换整个主题方法,但令人惊讶的是输出是完全相同的。
当您调用操作员时,它会影响整个下游。如果您致电:
.observeOn(AndroidSchedulers.mainThread())
在错误的地方,流的其余部分在指定的线程上执行。
我建议你总是添加:
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
在流的最后:
private Subscription init() {
return sessionStore
.observe()
.flatMap(new Func1<Session, Observable<Object>>() {
@Override
public Observable<Object> call(Session session) {
return retrofitService.getAThing();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(...);
}
我想你忘记了你的 Subject
也是一个观察者所以为了在 io 线程上获得 onNext
到 运行 试试
public class SessionStore {
Subject<Session, Session> subject;
public UserSessionStore() {
subject = new SerializedSubject<>(BehaviorSubject.create(new Session())).observeOn(Schedulers.io());
}
public void set(Session session) {
subject.onNext(session);
}
public Observable<UserSession> observe() {
return subject.distinctUntilChanged();
}
}
请看书'Reactive Programming with RxJava'
中的以下部分默认情况下,在 Subject 上调用 onNext() 会直接传播到所有 Observer 的 onNext() 回调方法。这些方法共享相同的名称也就不足为奇了。在某种程度上,调用 Subject 上的 onNext() 会间接调用每个 Subscriber 上的 onNext()。
让我们回顾一下: 如果您在 Thread-1 的 Subject 上调用 onNext,它会从 Thread-1 调用 onNext 给订阅者。 onSubscribe 将被丢弃。
首先要做的是: 订阅将在哪个线程上进行:
retrofitService.getAThing()
我只是猜测,并说它是调用线程。这将是 observeOn 中描述的线程,即 Android-UI-Loop.
observeOn 下的每个值都将按照调度程序的指定从 Thread-a 转移到 Thread-b。 UI-Loop 上的 observeOn 应该在订阅之前发生。订阅中收到的每个值都将在 UI-Loop 上,这不会阻塞 UI 线程或以异常结束。
请看一下示例代码和输出:
class SessionStore {
private Subject<String, String> subject;
public SessionStore() {
subject = BehaviorSubject.create("wurst").toSerialized();
}
public void set(String session) {
subject.onNext(session);
}
public Observable<String> observe() {
return subject
.asObservable()
.doOnNext(s -> System.out.println("Receiving value on Thread:: " + Thread.currentThread()))
.distinctUntilChanged();
}
}
@Test
public void name() throws Exception {
// init
SessionStore sessionStore = new SessionStore();
TestSubscriber testSubscriber = new TestSubscriber();
Subscription subscribe = sessionStore
.observe()
.flatMap(s -> {
return Observable.fromCallable(() -> {
System.out.println("flatMap Thread:: " + Thread.currentThread());
return s;
}).subscribeOn(Schedulers.io());
})
.doOnNext(s -> System.out.println("After flatMap Thread:: " + Thread.currentThread()))
.observeOn(Schedulers.newThread()) // imagine AndroidScheduler here
.subscribe(testSubscriber); // Do UI-Stuff in subscribe
new Thread(() -> {
System.out.println("set on Thread:: " + Thread.currentThread());
sessionStore.set("123");
}).start();
new Thread(() -> {
System.out.println("set on Thread:: " + Thread.currentThread());
sessionStore.set("345");
}).start();
boolean b = testSubscriber.awaitValueCount(3, 3_000, TimeUnit.MILLISECONDS);
Assert.assertTrue(b);
}
输出::
Receiving value on Thread:: Thread[main,5,main]
flatMap Thread:: Thread[RxIoScheduler-2,5,main]
After flatMap Thread:: Thread[RxIoScheduler-2,5,main]
set on Thread:: Thread[Thread-1,5,main]
set on Thread:: Thread[Thread-0,5,main]
Receiving value on Thread:: Thread[Thread-1,5,main]
flatMap Thread:: Thread[RxIoScheduler-2,5,main]
After flatMap Thread:: Thread[RxIoScheduler-2,5,main]
Receiving value on Thread:: Thread[Thread-1,5,main]
flatMap Thread:: Thread[RxIoScheduler-2,5,main]
After flatMap Thread:: Thread[RxIoScheduler-2,5,main]