将 ListenableFuture 链转换为等效的 RxJava 结构

Convert a ListenableFuture chain to the equivalent RxJava structure

使用 Guava Listenable Futures

假设我有以下 class:

public class FooService {
    ListenableFuture<Foo> getFoo() {
        //code to create callable, then
        return listeningExecutorService.submit(fooCallable);
    }
}

和以下 class:

public class BarService {
    ListenableFuture<Bar> getBar(Foo foo) {
        //code to create callable, then
        return listeningExecutorService.submit(barCallable);
    }
}

请注意 getBar 需要参数中的 Foo

如果我想将这两个操作链接在一起,我会像这样编写一个转换器函数:

AsyncFunction<Foo, Bar> fooToBar = new AsyncFunction<Foo, Bar>() {

     @Override
     ListenableFuture<Bar> apply(Foo resultantFoo) {
         return barService.get(resultantFoo);
     }
};

然后像这样应用转换:

public ListenableFuture<Bar> combinedFooToBar() {
     ListenableFuture<Foo> futureFoo = fooService.get();
     return Futures.transformAsync(futureFoo, fooToBar);
}

问题:如果我们将这些classes和转换函数转换成RxJava,等效的语法是什么?假设我们要将 FooServiceBarService 转换为适当的 RxJava 结构。假设我们想使用调用 FooService 的结果作为 BarService.

的参数来链接异步任务

注意:我刚刚开始学习 RxJava 语法。当我学习完语法后,我会尝试自己回答这个问题。不过,在此期间,如果有人想回答,欢迎他们。

Guava代码翻译成RxJava2代码如下:

FooService.java

public class FooService {

    Observable<Foo> getFoo() {
        return Observable.fromCallable(new Callable<Foo>() {
            @Override
            public Foo call() throws Exception {
                return new Foo();
            }
        });
    }
}

BarService.java

public class BarService {

    Observable<Bar> getBar(final Foo foo) {
        return Observable.fromCallable(new Callable<Bar>() {
            @Override
            public Bar call() throws Exception {
                return new Bar(foo);
            }
        });
    }
}

FooBarService.java

public class FooBarService {

    private final FooService fooService;
    private final BarService barService;

    public FooBarService(FooService fooService, BarService barService) {
        this.fooService = fooService;
        this.barService = barService;
    }

    Observable<Bar> getFooBar() {
        return fooService.getFoo()
                .concatMap(new Function<Foo, ObservableSource<? extends Bar>>() {
                    @Override
                    public ObservableSource<? extends Bar> apply(@NonNull Foo foo) throws Exception {
                        return barService.getBar(foo);
                    }
                });
    }
}

因此,concatMap and flatMap are similar to Futures.transformAsync and map 类似于 Futures.transform(非异步)。

另请注意这个名为 Future Converter 的 Github 项目,用于 ListenableFutureObservable 之间的转换。