RxJava:如何表达doOnFirst()?

RxJava: How to express doOnFirst()?

我正在使用 RxJava,我有一个 Observable 里面有多个项目。我想做的是 运行 第一个项目上的函数 A,所有项目上的函数 B 和 Observable 完成时的函数 C:

-----1-----2-----3-----|-->
     |     |     |     |
     run A |     |     |
     |     |     |     |
     run B run B run B |
                       |
                       run C

有没有用 lambda 函数表达它的巧妙方法?我已经有了以下解决方案,但它看起来很难看,我怀疑有更好的方法来做到这一点:

observable.subscribe(
        new Action1<Item>() {
            boolean first = true;

            @Override
            public void call(Item item) {
                if (first) {
                    runA(item);
                    first = false;
                }
                runB(fax1);
            }
        },
        throwable -> {},
        () -> runC());

已经内置了 doOnNextdoOnTerminate(或类似的),所以听起来好像缺少的只是对第一项执行操作。这是一种方法。你可以发布你的流,然后在一个块中流正常进行,而在一个单独的订阅中,我们只监听第一个事件(first)并在收到它时执行操作。这是一个例子:

observable.publish(new Func1<Observable<Item>, Observable<Item>>() {
    @Override
    public Observable<Item> call(Observable<Item> itemObservable) {
        itemObservable.first().subscribe((Item item) -> runA(item));
        return itemObservable;
    }
}).subscribe(/* some actions and subscription as usual ... */);

如果这看起来太冗长,您可以将其放入实用程序 Transformer 中并保留一些构建器语法。例如:

public static class Transformers<T> implements Observable.Transformer<T, T> {
    private final Action1<T> action1;

    private Transformers(final Action1<T> action1) {
        this.action1 = action1;
    }
    // cover for generics
    public static <T> Observable.Transformer<T, T> doOnFirst(final Action1<T> action1) {
        return new Transformers<T>(action1);
    }

    @Override
    public Observable<T> call(final Observable<T> observable) {
        return observable.publish(new Func1<Observable<T>, Observable<T>>() {
            @Override
            public Observable<T> call(Observable<T> observable) {
                observable.first().subscribe(action1);
                return observable;
            }
        });
    }
}

那么你可以这样称呼它:

observable
    .compose(Transformers.doOnFirst((Item item) -> runA(item)))
    .subscribe(/* chain and subscribe as usual... */);

显然,使用 Lambda 语法这一切看起来更漂亮,以上是我的猜测。

使用 Observable.defer 封装每个订阅状态(是一个布尔值,指示我们是否在第一条记录上)。

这是演示使用的可运行 class:

import rx.Observable;
import rx.Observable.Transformer;
import rx.functions.Action1;

public class DoOnFirstMain {

    public static void main(String[] args) {

        Observable<Integer> o = 
            Observable.just(1, 2, 3)
                .compose(doOnFirst(System.out::println);
        // will print 1
        o.subscribe();
        // will print 1
        o.subscribe();
    }

    public static <T> Transformer<T, T> doOnFirst(Action1<? super T> action) {
        return o -> Observable.defer(() -> {
            final AtomicBoolean first = new AtomicBoolean(true);
            return o.doOnNext(t -> {
                if (first.compareAndSet(true, false)) {
                    action.call(t);
                }
            });
        });
    }

}

尽管 OP 询问的是 RxJava1,但上面的解决方案与 RxJava2 相同:

import java.util.concurrent.atomic.AtomicBoolean;

import io.reactivex.Flowable;
import io.reactivex.FlowableTransformer;
import io.reactivex.functions.Consumer;

public class DoOnFirstMain {

    public static void main(String[] args) {

        Flowable<Integer> f =
                Flowable.just(1, 2, 3)
                        .compose(doOnFirst(System.out::println);
        // will print 1
        f.subscribe();
        // will print 1
        f.subscribe();
    }

    public static <T> FlowableTransformer<T, T> doOnFirst(Consumer<? super T> consumer) {
        return f -> Flowable.defer(() -> {
            final AtomicBoolean first = new AtomicBoolean(true);
            return f.doOnNext(t -> {
                if (first.compareAndSet(true, false)) {
                    consumer.accept(t);
                }
            });
        });
    }
}

我想我自己找到了一个简单的解决方案:

Observable<Integer> observable = Observable.just(1, 2, 3).share();
observable.take(1).subscribe(this::runA);
observable.subscribe(
    this::runB,
    throwable -> {},
    this::runC);

这可以在单线程下工作,而且似乎也可以在多线程上工作,但我不得不承认到目前为止我对此并不太自信。