在 Vert.x 中将 Observable 连接到 SingleInterop

Wiring an Observable to a SingleInterop in Vert.x

我无法弄清楚如何使用 SingleInterop 正确连接 Observables onNext() 调用。我也不确定如何在不阻塞主线程的情况下 return 预期结果。我的粗略尝试如下所示。任何帮助将不胜感激。

public class SomeClass {
  private ExecutorService executor = Executors.newSingleThreadExecutor;
  private ObservableOnSubscribe<MyCustomObj> disHandler;

  public SomeClass() {
    init();
  }

  /** Create an observable to listen to a data input stream **/
  private void init() {
    disHandler = emitter ->
      executor.submit(() -> {
        try {
          while (true) { 
            MyCustomObj mco = readFromDataInputStream();
            emitter.onNext(mco);
          }
          // Should never complete since always listening...
          emitter.onComplete();
        }
        catch(Exception e) {
          emitter.onError(e);
        }
      });
  }
  public Single<String> invokeSomethingAndGetResponseFromHandler(Object someObj) {
    // Eventually the disHandler will send an onNext() as 
    // a result of doSomethingToBackend() call.

    Observable<String> observer = Observable.fromCallable(doSomethingToBackend(someObj));
    observer.subscribe(item -> item);

    return ???.to(SingleInterop.get());
  }
}

public class GraphQLVertx {

  public VertxDataFetcher<Single<String>> dataFetcherTest() {
    return new VertxDataFetcher<>((env, future) -> {
      try {
        future.complete(someClass.invokeSomethingAndGetResponseFromHandler("Blah");
      }
      catch(Exception e) {
        future.fail();
      }
    });
  }
}

observable 有一个辅助方法,returns 第一个发出的元素作为单个元素,或者是一个错误,所以如果你已经有一个 Observable<String> 你可以将它转换为 Single<String>像这样:

public Single<String> invokeSomethingAndGetResponseFromHandler(Object someObj) {
    return Observable.fromCallable(doSomethingToBackend(someObj))
        .singleOrError();
  }