使用 pubSubReactiveFactory 重新启动无限 Flux
Restarting inifinite Flux on error with pubSubReactiveFactory
我正在开发一个使用反应堆库连接 Google pubsub 的应用程序。所以我有大量的消息。我希望它始终从队列中消费,无论发生什么:这意味着处理所有错误以免终止通量。我在考虑(不太可能)与 pubsub 的连接可能丢失或任何可能导致刚刚创建的 Flux 发出错误信号的事件。我想到了这个解决方案:
private final PubSubReactiveFactory pubSubReactiveFactory;
private final String requestSubscription;
private final Long requestPollTime;
private final Flux<AcknowledgeablePubsubMessage> requestFlux;
@Autowired
public FluxContainer(/* Field args...*/) {
// init stuff...
this.requestFlux = initRequestFlux();
}
private Flux<AcknowledgeablePubsubMessage> initRequestFlux() {
return pubSubReactiveFactory.poll(requestSubscription, requestPollTime);
.doOnError(e -> log.error("FATAL ERROR: could not retrieve message from queue. Resetting flux", e))
.onErrorResume(e -> initRequestFlux());
}
@EventListener(ApplicationReadyEvent.class)
public void configureFluxAndSubscribe() {
log.info("Setting up requestFlux...");
this.requestFlux
.doOnNext(AcknowledgeablePubsubMessage::ack)
// ...many more concatenated calls handling flux
}
有道理吗?我担心内存分配(我依靠 gc 来清理东西)。欢迎任何评论。
我认为您正在寻找的基本上是一个 Flux
,它会在任何情况下终止时自行重新启动,但处理订阅除外。在我的例子中,我有一个源可以从 Docker 守护进程生成无限事件,它可以“成功”断开连接
让 sourceFlux
成为提供您数据的流量,并且您希望在出错或完成时重新启动,但在订阅处理时停止。
- 创建恢复函数
Function<Throwable, Publisher<Integer>> recoverFromThrow =
throwable -> sourceFlux
- 创建一个可以从抛出中恢复的新通量
var recoveringFromThrowFlux =
sourceFlux.onErrorResume(recoverFromThrow);
- 创建一个 Flux 生成器,生成可以从投掷中恢复的通量。 (注意需要通用强制)
var foreverFlux =
Flux.<Flux<Integer>>generate((sink) -> sink.next(recoveringFromThrowFlux))
.flatMap(flux -> flux);
foreverFlux
是做自恢复的助焊剂
我正在开发一个使用反应堆库连接 Google pubsub 的应用程序。所以我有大量的消息。我希望它始终从队列中消费,无论发生什么:这意味着处理所有错误以免终止通量。我在考虑(不太可能)与 pubsub 的连接可能丢失或任何可能导致刚刚创建的 Flux 发出错误信号的事件。我想到了这个解决方案:
private final PubSubReactiveFactory pubSubReactiveFactory;
private final String requestSubscription;
private final Long requestPollTime;
private final Flux<AcknowledgeablePubsubMessage> requestFlux;
@Autowired
public FluxContainer(/* Field args...*/) {
// init stuff...
this.requestFlux = initRequestFlux();
}
private Flux<AcknowledgeablePubsubMessage> initRequestFlux() {
return pubSubReactiveFactory.poll(requestSubscription, requestPollTime);
.doOnError(e -> log.error("FATAL ERROR: could not retrieve message from queue. Resetting flux", e))
.onErrorResume(e -> initRequestFlux());
}
@EventListener(ApplicationReadyEvent.class)
public void configureFluxAndSubscribe() {
log.info("Setting up requestFlux...");
this.requestFlux
.doOnNext(AcknowledgeablePubsubMessage::ack)
// ...many more concatenated calls handling flux
}
有道理吗?我担心内存分配(我依靠 gc 来清理东西)。欢迎任何评论。
我认为您正在寻找的基本上是一个 Flux
,它会在任何情况下终止时自行重新启动,但处理订阅除外。在我的例子中,我有一个源可以从 Docker 守护进程生成无限事件,它可以“成功”断开连接
让 sourceFlux
成为提供您数据的流量,并且您希望在出错或完成时重新启动,但在订阅处理时停止。
- 创建恢复函数
Function<Throwable, Publisher<Integer>> recoverFromThrow = throwable -> sourceFlux
- 创建一个可以从抛出中恢复的新通量
var recoveringFromThrowFlux = sourceFlux.onErrorResume(recoverFromThrow);
- 创建一个 Flux 生成器,生成可以从投掷中恢复的通量。 (注意需要通用强制)
var foreverFlux = Flux.<Flux<Integer>>generate((sink) -> sink.next(recoveringFromThrowFlux)) .flatMap(flux -> flux);
foreverFlux
是做自恢复的助焊剂