如何在 Reactor 3 中编写运算符?

How to write operators in Reactor 3?

我通过实现 org.reactivestreams.Publisher 实现了 Reactor 运算符,如下所示。但是,我想知道这是否是使用 Reactor 的 正确方法™。手动实现订阅者看起来有点麻烦。而 Operators class 似乎在这方面没有帮助。

class MyOperator implements Publisher<Integer> {

    private final Publisher<Integer> source;

    public MyOperator(Publisher<Integer> source) {
        this.source = source;
    }

    @Override
    public void subscribe(Subscriber<? super Integer> target) {
        final Subscriber<Integer> wrappedSubscriber = createSubscriber(target);
        source.subscribe(wrappedSubscriber);
    }

    private Subscriber<Integer> createSubscriber(Subscriber<? super Integer> target) {
        return new Subscriber<Integer>() {
            private Subscription subscription;

            @Override
            public void onSubscribe(Subscription subscription) {
                this.subscription = subscription;
                subscription.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(Integer integer) {
                target.onNext(integer + 1); // actual behaviour
                subscription.request(Long.MAX_VALUE);
            }

            @Override
            public void onError(Throwable t) {
                target.onError(t);
            }

            @Override
            public void onComplete() {
                target.onComplete();
            }
        };
    }
}

或者下面的例子是正确的方法™?

class MyCompactOperator implements Publisher<Integer> {

    final Flux<Integer> flux;

    public MyCompactOperator(Publisher<Integer> source) {
        flux = Flux.from(source).map(number -> number + 1);
    }

    @Override
    public void subscribe(Subscriber<? super Integer> subscriber) {
        flux.subscribe(subscriber);
    }
}

至少这需要更少的代码。

变体 3 以 Flux 作为来源,如 Simon Baslé 所建议:

class MyFluxOperator extends Flux<Integer> {

    private final Flux<Integer> source;

    public MyFluxOperator(Flux<Integer> source) {
        this.source = source;
    }

    @Override
    public void subscribe(Subscriber<? super Integer> subscriber) {
        source.map(number -> number + 1).subscribe(subscriber);
    }
}

所有实施都按预期工作:

Flux<Integer> source = Flux.just(1, 2, 3, 4, 5);
Flux.from(new MyOperator(source)).subscribe(System.out::println);

// for variant 3
new MyFluxOperator(source).subscribe(System.out::println);

我在第二行使用了一个Flux来避免另一个订阅者的实现。

输出:

2
3
4
5
6

问题:

看到你的第二个选项,你似乎认为你必须实施一个发布者。绝对不是这样(恰恰相反)。从 reactor Flux 源(或 Publisher + Reactor 的 Flux.from)开始,然后简单地链接 map.

编辑:澄清一下您不想创建任何 class,只需在您的主代码路径中创建:

  • 如果您的 source 已经是 FluxMono:

    Flux<Integer> incrementedSource = source.map(i -> i + 1);
    incrementedSource.subscribe(subscriber);
    
  • 如果你的source是另一种Publisher:

    Flux<Integer> incrementedSource = Flux.from(source).map(i -> i + 1);
    incrementedSource.subscribe(subscriber);
    

像 Reactor 这样的库的整个想法是为您提供可以直接组合的运算符,而无需编写 Publisher

如果您需要一种方法来共享代码,因为您经常将一组运算符应用于各种 Flux,请查看 transformcompose(以及 reference documentation).