如何从 Runnable 中创建 Observable?

How to create Observable out of Runnable?

有时我想触发 Runnable 作为我的 Observable 序列的一部分,但 Runnable 不报告进度。

我写了一个简单的工厂来将 Runnable 对象包装成 Observable:

public static <T> Observable<T> fromRunnable(final Runnable action) {
    if (action == null) {
        throw new NullPointerException("action");
    }
    return Observable.fromPublisher(subscriber -> {
        try {
            action.run();
            subscriber.onComplete();
        } catch (final Throwable throwable) {
            subscriber.onError(throwable);
        }
    });
}

用法:

Observable.concat(
    someTask, 
    MoreObservables.fromRunnable(() -> {
        System.out.println("Done. ");
    }));

但是 RxJava 2 是否已经提供了这个功能?

Observable没有这样的工厂方法,但是Completable可以从Runnable制作。所以你可以先创建一个 Completable 然后把它转换成 Observable:

Observable.concat(
    someTask, 
    Completable.fromRunnable(() -> {
        System.out.println("Done");
    }).toObservable()
);

更新:异常处理

Completable.fromRunnable 在内部捕获来自其 Runnable 的异常并将它们作为 onError 排放推送到流中。但是,如果您使用 Java,则必须自己处理 run() 方法中的检查异常。为避免这种情况,您可以使用 Callable 而不是 Runnable,因为其 call() 方法的签名声明它可以抛出异常。 Completable.fromCallable() 也将异常包装到 onError 排放中:

Observable.concat(
    someTask, 
    Completable.fromCallable(() -> {
        System.out.println("Done");
        return null;
    }).toObservable()
);

此外,Callable 可用于创建具有单个项目排放的 ObservableSingle

P.S.查看源码,这些方法很简单

P.P.S. Kotlin 没有检查异常 ;)


更新 2

还有 fromAction 用于创建 Completable 的工厂方法。它接受 Action 个对象。

A functional interface similar to Runnable but allows throwing a checked exception.

所以代码可以简化为:

Observable.concat(
    someTask, 
    Completable.fromAction(() -> {
        System.out.println("Done");
    }).toObservable()
);