如何在 Reactor 3 中编写运算符?
How to write operators in Reactor 3?
我通过实现 org.reactivestreams.Publisher
实现了 Reactor 运算符,如下所示。但是,我想知道这是否是使用 Reactor 的 正确方法™。手动实现订阅者看起来有点麻烦。而 Operators class 似乎在这方面没有帮助。
class MyOperator implements Publisher<Integer> {
private final Publisher<Integer> source;
public MyOperator(Publisher<Integer> source) {
this.source = source;
}
@Override
public void subscribe(Subscriber<? super Integer> target) {
final Subscriber<Integer> wrappedSubscriber = createSubscriber(target);
source.subscribe(wrappedSubscriber);
}
private Subscriber<Integer> createSubscriber(Subscriber<? super Integer> target) {
return new Subscriber<Integer>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
target.onNext(integer + 1); // actual behaviour
subscription.request(Long.MAX_VALUE);
}
@Override
public void onError(Throwable t) {
target.onError(t);
}
@Override
public void onComplete() {
target.onComplete();
}
};
}
}
或者下面的例子是正确的方法™?
class MyCompactOperator implements Publisher<Integer> {
final Flux<Integer> flux;
public MyCompactOperator(Publisher<Integer> source) {
flux = Flux.from(source).map(number -> number + 1);
}
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
flux.subscribe(subscriber);
}
}
至少这需要更少的代码。
变体 3 以 Flux
作为来源,如 Simon Baslé 所建议:
class MyFluxOperator extends Flux<Integer> {
private final Flux<Integer> source;
public MyFluxOperator(Flux<Integer> source) {
this.source = source;
}
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
source.map(number -> number + 1).subscribe(subscriber);
}
}
所有实施都按预期工作:
Flux<Integer> source = Flux.just(1, 2, 3, 4, 5);
Flux.from(new MyOperator(source)).subscribe(System.out::println);
// for variant 3
new MyFluxOperator(source).subscribe(System.out::println);
我在第二行使用了一个Flux
来避免另一个订阅者的实现。
输出:
2
3
4
5
6
问题:
- 我的实现中是否缺少某些内容?
- 是否有更好的方法(代码更少、错误处理更好或其他)在 Reactor 3 中实现运算符?
- 两种方法之间是否存在相关的功能或非功能差异?
看到你的第二个选项,你似乎认为你必须实施一个发布者。绝对不是这样(恰恰相反)。从 reactor Flux 源(或 Publisher + Reactor 的 Flux.from)开始,然后简单地链接 map
.
编辑:澄清一下您不想创建任何 class,只需在您的主代码路径中创建:
如果您的 source
已经是 Flux
或 Mono
:
Flux<Integer> incrementedSource = source.map(i -> i + 1);
incrementedSource.subscribe(subscriber);
如果你的source
是另一种Publisher
:
Flux<Integer> incrementedSource = Flux.from(source).map(i -> i + 1);
incrementedSource.subscribe(subscriber);
像 Reactor 这样的库的整个想法是为您提供可以直接组合的运算符,而无需编写 Publisher
如果您需要一种方法来共享代码,因为您经常将一组运算符应用于各种 Flux
,请查看 transform
和 compose
(以及 reference documentation).
我通过实现 org.reactivestreams.Publisher
实现了 Reactor 运算符,如下所示。但是,我想知道这是否是使用 Reactor 的 正确方法™。手动实现订阅者看起来有点麻烦。而 Operators class 似乎在这方面没有帮助。
class MyOperator implements Publisher<Integer> {
private final Publisher<Integer> source;
public MyOperator(Publisher<Integer> source) {
this.source = source;
}
@Override
public void subscribe(Subscriber<? super Integer> target) {
final Subscriber<Integer> wrappedSubscriber = createSubscriber(target);
source.subscribe(wrappedSubscriber);
}
private Subscriber<Integer> createSubscriber(Subscriber<? super Integer> target) {
return new Subscriber<Integer>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
target.onNext(integer + 1); // actual behaviour
subscription.request(Long.MAX_VALUE);
}
@Override
public void onError(Throwable t) {
target.onError(t);
}
@Override
public void onComplete() {
target.onComplete();
}
};
}
}
或者下面的例子是正确的方法™?
class MyCompactOperator implements Publisher<Integer> {
final Flux<Integer> flux;
public MyCompactOperator(Publisher<Integer> source) {
flux = Flux.from(source).map(number -> number + 1);
}
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
flux.subscribe(subscriber);
}
}
至少这需要更少的代码。
变体 3 以 Flux
作为来源,如 Simon Baslé 所建议:
class MyFluxOperator extends Flux<Integer> {
private final Flux<Integer> source;
public MyFluxOperator(Flux<Integer> source) {
this.source = source;
}
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
source.map(number -> number + 1).subscribe(subscriber);
}
}
所有实施都按预期工作:
Flux<Integer> source = Flux.just(1, 2, 3, 4, 5);
Flux.from(new MyOperator(source)).subscribe(System.out::println);
// for variant 3
new MyFluxOperator(source).subscribe(System.out::println);
我在第二行使用了一个Flux
来避免另一个订阅者的实现。
输出:
2
3
4
5
6
问题:
- 我的实现中是否缺少某些内容?
- 是否有更好的方法(代码更少、错误处理更好或其他)在 Reactor 3 中实现运算符?
- 两种方法之间是否存在相关的功能或非功能差异?
看到你的第二个选项,你似乎认为你必须实施一个发布者。绝对不是这样(恰恰相反)。从 reactor Flux 源(或 Publisher + Reactor 的 Flux.from)开始,然后简单地链接 map
.
编辑:澄清一下您不想创建任何 class,只需在您的主代码路径中创建:
如果您的
source
已经是Flux
或Mono
:Flux<Integer> incrementedSource = source.map(i -> i + 1); incrementedSource.subscribe(subscriber);
如果你的
source
是另一种Publisher
:Flux<Integer> incrementedSource = Flux.from(source).map(i -> i + 1); incrementedSource.subscribe(subscriber);
像 Reactor 这样的库的整个想法是为您提供可以直接组合的运算符,而无需编写 Publisher
如果您需要一种方法来共享代码,因为您经常将一组运算符应用于各种 Flux
,请查看 transform
和 compose
(以及 reference documentation).