运行 在不同线程 rxJava 上发布主题
run PublishSubject on different thread rxJava
我正在 运行ning RxJava 并创建一个主题以使用 onNext()
方法来生成数据。我正在使用 Spring。
这是我的设置:
@Component
public class SubjectObserver {
private SerializedSubject<SomeObj, SomeObj> safeSource;
public SubjectObserver() {
safeSource = PublishSubject.<SomeObj>create().toSerialized();
**safeSource.subscribeOn(<my taskthreadExecutor>);**
**safeSource.observeOn(<my taskthreadExecutor>);**
safeSource.subscribe(new Subscriber<AsyncRemoteRequest>() {
@Override
public void onNext(AsyncRemoteRequest asyncRemoteRequest) {
LOGGER.debug("{} invoked.", Thread.currentThread().getName());
doSomething();
}
}
}
public void publish(SomeObj myObj) {
safeSource.onNext(myObj);
}
}
在 RxJava 流上生成新数据的方式是 @Autowire private SubjectObserver subjectObserver
然后调用 subjectObserver.publish(newDataObjGenerated)
无论我为 subscribeOn()
& observeOn()
指定什么:
- Schedulers.io()
- Schedulers.computation()
- 我的帖子
- Schedulers.newThread
onNext()
及其内部的实际工作是在同一个线程上完成的,该线程实际调用主题上的 onNext()
到 generate/produce 数据。
这是正确的吗?如果是这样,我错过了什么?我期待 doSomething()
在不同的线程上完成。
更新
在我的调用 class 中,如果我改变调用 publish
方法的方式,那么当然会为 运行 上的订阅者分配一个新线程。
taskExecutor.execute(() -> subjectObserver.publish(newlyGeneratedObj));
谢谢,
Observable
/Subject
上的每个运算符 return 一个具有额外行为的新实例,但是,您的代码仅应用 subscribeOn
和 observeOn
然后扔掉他们生产的任何东西并订阅原始 Subject
。您应该链接方法调用然后订阅:
safeSource = PublishSubject.<AsyncRemoteRequest>create().toSerialized();
safeSource
.subscribeOn(<my taskthreadExecutor>)
.observeOn(<my taskthreadExecutor>)
.subscribe(new Subscriber<AsyncRemoteRequest>() {
@Override
public void onNext(AsyncRemoteRequest asyncRemoteRequest) {
LOGGER.debug("{} invoked.", Thread.currentThread().getName());
doSomething();
}
});
请注意,subscribeOn
对 PublishSubject
没有实际影响,因为在其 subscribe()
方法中没有发生订阅副作用。
我正在 运行ning RxJava 并创建一个主题以使用 onNext()
方法来生成数据。我正在使用 Spring。
这是我的设置:
@Component
public class SubjectObserver {
private SerializedSubject<SomeObj, SomeObj> safeSource;
public SubjectObserver() {
safeSource = PublishSubject.<SomeObj>create().toSerialized();
**safeSource.subscribeOn(<my taskthreadExecutor>);**
**safeSource.observeOn(<my taskthreadExecutor>);**
safeSource.subscribe(new Subscriber<AsyncRemoteRequest>() {
@Override
public void onNext(AsyncRemoteRequest asyncRemoteRequest) {
LOGGER.debug("{} invoked.", Thread.currentThread().getName());
doSomething();
}
}
}
public void publish(SomeObj myObj) {
safeSource.onNext(myObj);
}
}
在 RxJava 流上生成新数据的方式是 @Autowire private SubjectObserver subjectObserver
然后调用 subjectObserver.publish(newDataObjGenerated)
无论我为 subscribeOn()
& observeOn()
指定什么:
- Schedulers.io()
- Schedulers.computation()
- 我的帖子
- Schedulers.newThread
onNext()
及其内部的实际工作是在同一个线程上完成的,该线程实际调用主题上的 onNext()
到 generate/produce 数据。
这是正确的吗?如果是这样,我错过了什么?我期待 doSomething()
在不同的线程上完成。
更新
在我的调用 class 中,如果我改变调用 publish
方法的方式,那么当然会为 运行 上的订阅者分配一个新线程。
taskExecutor.execute(() -> subjectObserver.publish(newlyGeneratedObj));
谢谢,
Observable
/Subject
上的每个运算符 return 一个具有额外行为的新实例,但是,您的代码仅应用 subscribeOn
和 observeOn
然后扔掉他们生产的任何东西并订阅原始 Subject
。您应该链接方法调用然后订阅:
safeSource = PublishSubject.<AsyncRemoteRequest>create().toSerialized();
safeSource
.subscribeOn(<my taskthreadExecutor>)
.observeOn(<my taskthreadExecutor>)
.subscribe(new Subscriber<AsyncRemoteRequest>() {
@Override
public void onNext(AsyncRemoteRequest asyncRemoteRequest) {
LOGGER.debug("{} invoked.", Thread.currentThread().getName());
doSomething();
}
});
请注意,subscribeOn
对 PublishSubject
没有实际影响,因为在其 subscribe()
方法中没有发生订阅副作用。