运行 在不同线程 rxJava 上发布主题

run PublishSubject on different thread rxJava

我正在 运行ning RxJava 并创建一个主题以使用 onNext() 方法来生成数据。我正在使用 Spring

这是我的设置:

@Component
public class SubjectObserver {
    private SerializedSubject<SomeObj, SomeObj> safeSource;
    public SubjectObserver() {
       safeSource = PublishSubject.<SomeObj>create().toSerialized();
       **safeSource.subscribeOn(<my taskthreadExecutor>);**
       **safeSource.observeOn(<my taskthreadExecutor>);** 
       safeSource.subscribe(new Subscriber<AsyncRemoteRequest>() {
          @Override
          public void onNext(AsyncRemoteRequest asyncRemoteRequest) {
            LOGGER.debug("{} invoked.", Thread.currentThread().getName());
            doSomething();
          }
      }
    }
    public void publish(SomeObj myObj) {
        safeSource.onNext(myObj);
    }
}

在 RxJava 流上生成新数据的方式是 @Autowire private SubjectObserver subjectObserver 然后调用 subjectObserver.publish(newDataObjGenerated)

无论我为 subscribeOn() & observeOn() 指定什么:

onNext() 及其内部的实际工作是在同一个线程上完成的,该线程实际调用主题上的 onNext() 到 generate/produce 数据。

这是正确的吗?如果是这样,我错过了什么?我期待 doSomething() 在不同的线程上完成。

更新

在我的调用 class 中,如果我改变调用 publish 方法的方式,那么当然会为 运行 上的订阅者分配一个新线程。

taskExecutor.execute(() -> subjectObserver.publish(newlyGeneratedObj));

谢谢,

Observable/Subject 上的每个运算符 return 一个具有额外行为的新实例,但是,您的代码仅应用 subscribeOnobserveOn然后扔掉他们生产的任何东西并订阅原始 Subject。您应该链接方法调用然后订阅:

safeSource = PublishSubject.<AsyncRemoteRequest>create().toSerialized();

safeSource
.subscribeOn(<my taskthreadExecutor>)
.observeOn(<my taskthreadExecutor>)
.subscribe(new Subscriber<AsyncRemoteRequest>() {
     @Override
     public void onNext(AsyncRemoteRequest asyncRemoteRequest) {
         LOGGER.debug("{} invoked.", Thread.currentThread().getName());
         doSomething();
     }
});

请注意,subscribeOnPublishSubject 没有实际影响,因为在其 subscribe() 方法中没有发生订阅副作用。