Reactor - 如何在不丢弃元素的情况下重试热通量?
Reactor - how to retry on hot flux without dropping elements?
我有无穷无尽的数据热流。我即将对流中的每个元素执行操作,每个元素 returns 一个 Mono,它将在有限的时间后完成(以某种方式)。
这些操作可能会引发错误。如果是这样,我想重新订阅热通量而不遗漏任何东西,重试抛出错误时正在处理的元素(即任何未成功完成的元素)。
我在这里做什么?我可以容忍对相同元素的重复操作,但不能完全从流中丢失元素。
我已经尝试使用 ReplayProcessor 来处理这个问题,但是我看不出有什么方法可以让它在不重复很多可能已经成功的操作(使用非常保守的超时)或丢失的情况下工作由于新元素覆盖缓冲区中的旧元素而产生的元素(如下所示)。
测试用例:
@Test
public void fluxTest() {
List<String> strings = new ArrayList<>();
strings.add("one");
strings.add("two");
strings.add("three");
strings.add("four");
ConnectableFlux<String> flux = Flux.fromIterable(strings).publish();
//Goes boom after three uses of its method, otherwise
//returns a mono. completing after a little time
DangerousClass dangerousClass = new DangerousClass(3);
ReplayProcessor<String> replay = ReplayProcessor.create(3);
flux.subscribe(replay);
replay.flatMap(dangerousClass::doThis)
.retry(1)
.doOnNext(s -> LOG.info("Completed {}", s))
.subscribe();
flux.connect();
flux.blockLast();
}
public class DangerousClass {
Logger LOG = LoggerFactory.getLogger(DangerousClass.class);
private int boomCount;
private AtomicInteger count;
public DangerousClass(int boomCount) {
this.boomCount = boomCount;
this.count = new AtomicInteger(0);
}
public Mono<String> doThis(String s) {
return Mono.fromSupplier(() -> {
LOG.info("doing dangerous {}", s);
if (count.getAndIncrement() == boomCount) {
LOG.error("Throwing exception from {}", s);
throw new RuntimeException("Boom!");
}
return s;
}).delayElement(Duration.ofMillis(600));
}
}
这会打印:
doing dangerous one
doing dangerous two
doing dangerous three
doing dangerous four
Throwing exception from four
doing dangerous two
doing dangerous three
doing dangerous four
Completed four
Completed two
Completed three
一个永远不会完成。
错误(至少在上面的示例中)只能发生在 flatMap(dangerousClass::doThis)
调用中 - 因此重新订阅根 Flux
并在这个 flatMap()
调用发生时重播元素失败似乎有点奇怪,而且(可能)不是你想要做的。
相反,我建议放弃 ReplayProcessor
并只在内部 flatMap()
调用上调用重试,这样你最终会得到类似的结果:
ConnectableFlux<String> flux = Flux.range(1, 10).map(n -> "Entry " + n).publish();
DangerousClass dangerousClass = new DangerousClass(3);
flux.flatMap(x -> dangerousClass.doThis(x).retry(1))
.doOnNext(s -> System.out.println("Completed " + s))
.subscribe();
flux.connect();
这将为您提供如下内容,所有条目均已完成且无需重试:
doing dangerous Entry 1
doing dangerous Entry 2
doing dangerous Entry 3
doing dangerous Entry 4
Throwing exception from Entry 4
doing dangerous Entry 4
Completed Entry 2
Completed Entry 1
Completed Entry 3
Completed Entry 4
我有无穷无尽的数据热流。我即将对流中的每个元素执行操作,每个元素 returns 一个 Mono,它将在有限的时间后完成(以某种方式)。
这些操作可能会引发错误。如果是这样,我想重新订阅热通量而不遗漏任何东西,重试抛出错误时正在处理的元素(即任何未成功完成的元素)。
我在这里做什么?我可以容忍对相同元素的重复操作,但不能完全从流中丢失元素。
我已经尝试使用 ReplayProcessor 来处理这个问题,但是我看不出有什么方法可以让它在不重复很多可能已经成功的操作(使用非常保守的超时)或丢失的情况下工作由于新元素覆盖缓冲区中的旧元素而产生的元素(如下所示)。
测试用例:
@Test
public void fluxTest() {
List<String> strings = new ArrayList<>();
strings.add("one");
strings.add("two");
strings.add("three");
strings.add("four");
ConnectableFlux<String> flux = Flux.fromIterable(strings).publish();
//Goes boom after three uses of its method, otherwise
//returns a mono. completing after a little time
DangerousClass dangerousClass = new DangerousClass(3);
ReplayProcessor<String> replay = ReplayProcessor.create(3);
flux.subscribe(replay);
replay.flatMap(dangerousClass::doThis)
.retry(1)
.doOnNext(s -> LOG.info("Completed {}", s))
.subscribe();
flux.connect();
flux.blockLast();
}
public class DangerousClass {
Logger LOG = LoggerFactory.getLogger(DangerousClass.class);
private int boomCount;
private AtomicInteger count;
public DangerousClass(int boomCount) {
this.boomCount = boomCount;
this.count = new AtomicInteger(0);
}
public Mono<String> doThis(String s) {
return Mono.fromSupplier(() -> {
LOG.info("doing dangerous {}", s);
if (count.getAndIncrement() == boomCount) {
LOG.error("Throwing exception from {}", s);
throw new RuntimeException("Boom!");
}
return s;
}).delayElement(Duration.ofMillis(600));
}
}
这会打印:
doing dangerous one
doing dangerous two
doing dangerous three
doing dangerous four
Throwing exception from four
doing dangerous two
doing dangerous three
doing dangerous four
Completed four
Completed two
Completed three
一个永远不会完成。
错误(至少在上面的示例中)只能发生在 flatMap(dangerousClass::doThis)
调用中 - 因此重新订阅根 Flux
并在这个 flatMap()
调用发生时重播元素失败似乎有点奇怪,而且(可能)不是你想要做的。
相反,我建议放弃 ReplayProcessor
并只在内部 flatMap()
调用上调用重试,这样你最终会得到类似的结果:
ConnectableFlux<String> flux = Flux.range(1, 10).map(n -> "Entry " + n).publish();
DangerousClass dangerousClass = new DangerousClass(3);
flux.flatMap(x -> dangerousClass.doThis(x).retry(1))
.doOnNext(s -> System.out.println("Completed " + s))
.subscribe();
flux.connect();
这将为您提供如下内容,所有条目均已完成且无需重试:
doing dangerous Entry 1
doing dangerous Entry 2
doing dangerous Entry 3
doing dangerous Entry 4
Throwing exception from Entry 4
doing dangerous Entry 4
Completed Entry 2
Completed Entry 1
Completed Entry 3
Completed Entry 4