在 Vert.x 中将 Observable 连接到 SingleInterop
Wiring an Observable to a SingleInterop in Vert.x
我无法弄清楚如何使用 SingleInterop 正确连接 Observables onNext() 调用。我也不确定如何在不阻塞主线程的情况下 return 预期结果。我的粗略尝试如下所示。任何帮助将不胜感激。
public class SomeClass {
private ExecutorService executor = Executors.newSingleThreadExecutor;
private ObservableOnSubscribe<MyCustomObj> disHandler;
public SomeClass() {
init();
}
/** Create an observable to listen to a data input stream **/
private void init() {
disHandler = emitter ->
executor.submit(() -> {
try {
while (true) {
MyCustomObj mco = readFromDataInputStream();
emitter.onNext(mco);
}
// Should never complete since always listening...
emitter.onComplete();
}
catch(Exception e) {
emitter.onError(e);
}
});
}
public Single<String> invokeSomethingAndGetResponseFromHandler(Object someObj) {
// Eventually the disHandler will send an onNext() as
// a result of doSomethingToBackend() call.
Observable<String> observer = Observable.fromCallable(doSomethingToBackend(someObj));
observer.subscribe(item -> item);
return ???.to(SingleInterop.get());
}
}
public class GraphQLVertx {
public VertxDataFetcher<Single<String>> dataFetcherTest() {
return new VertxDataFetcher<>((env, future) -> {
try {
future.complete(someClass.invokeSomethingAndGetResponseFromHandler("Blah");
}
catch(Exception e) {
future.fail();
}
});
}
}
observable 有一个辅助方法,returns 第一个发出的元素作为单个元素,或者是一个错误,所以如果你已经有一个 Observable<String>
你可以将它转换为 Single<String>
像这样:
public Single<String> invokeSomethingAndGetResponseFromHandler(Object someObj) {
return Observable.fromCallable(doSomethingToBackend(someObj))
.singleOrError();
}
我无法弄清楚如何使用 SingleInterop 正确连接 Observables onNext() 调用。我也不确定如何在不阻塞主线程的情况下 return 预期结果。我的粗略尝试如下所示。任何帮助将不胜感激。
public class SomeClass {
private ExecutorService executor = Executors.newSingleThreadExecutor;
private ObservableOnSubscribe<MyCustomObj> disHandler;
public SomeClass() {
init();
}
/** Create an observable to listen to a data input stream **/
private void init() {
disHandler = emitter ->
executor.submit(() -> {
try {
while (true) {
MyCustomObj mco = readFromDataInputStream();
emitter.onNext(mco);
}
// Should never complete since always listening...
emitter.onComplete();
}
catch(Exception e) {
emitter.onError(e);
}
});
}
public Single<String> invokeSomethingAndGetResponseFromHandler(Object someObj) {
// Eventually the disHandler will send an onNext() as
// a result of doSomethingToBackend() call.
Observable<String> observer = Observable.fromCallable(doSomethingToBackend(someObj));
observer.subscribe(item -> item);
return ???.to(SingleInterop.get());
}
}
public class GraphQLVertx {
public VertxDataFetcher<Single<String>> dataFetcherTest() {
return new VertxDataFetcher<>((env, future) -> {
try {
future.complete(someClass.invokeSomethingAndGetResponseFromHandler("Blah");
}
catch(Exception e) {
future.fail();
}
});
}
}
observable 有一个辅助方法,returns 第一个发出的元素作为单个元素,或者是一个错误,所以如果你已经有一个 Observable<String>
你可以将它转换为 Single<String>
像这样:
public Single<String> invokeSomethingAndGetResponseFromHandler(Object someObj) {
return Observable.fromCallable(doSomethingToBackend(someObj))
.singleOrError();
}