使用通量通道的反应式 DSL 错误处理
Reactive DSL Error Handling with Flux Channels
我有一个从 SQS 读取的部分反应流,执行一些逻辑,保存到数据库 (R2DBC)。该流在反应通道上运行,该通道是 SqsMessageDrivenChannelAdapter
.
的入站通道
问题:
handle
方法 (.handle((payload, header) -> validator.validate((Dto) payload))
) 中抛出的异常不会到达 flowErrorChannel
。 errorProcessingFlow
未触发,我需要 errorProcessingFlow 记录并将异常抛出到 SimpleMessageListenerContainer
。
如果我将 objectChannel
和 flowErrorChannel
从 flux
更改为 direct
,效果与预期相同,但对于 Flux 通道则不然。对于通量通道,异常甚至不会传播到 SimpleMessageListenerContainer
,因为它不会根据 SQS 队列配置触发重新驱动,仅记录来自 handle
的异常。
这里是异常和流程配置:
2021-05-28 12:40:34.772 ERROR 59097 --- [enerContainer-2] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [ServiceActivator for [*********]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:450)
at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:324)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:267)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:231)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:140)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:88)
at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:37)
at org.springframework.integration.endpoint.ReactiveStreamsConsumer$SubscriberDecorator.hookOnNext(ReactiveStreamsConsumer.java:280)
at org.springframework.integration.endpoint.ReactiveStreamsConsumer$SubscriberDecorator.hookOnNext(ReactiveStreamsConsumer.java:261)
at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
at reactor.core.publisher.FluxRefCount$RefCountInner.onNext(FluxRefCount.java:199)
at reactor.core.publisher.FluxPublish$PublishSubscriber.drain(FluxPublish.java:477)
at reactor.core.publisher.FluxPublish$PublishSubscriber.onNext(FluxPublish.java:268)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:130)
at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:491)
at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:299)
at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:97)
at org.springframework.integration.channel.FluxMessageChannel.tryEmitMessage(FluxMessageChannel.java:79)
at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:68)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
at org.springframework.integration.aws.inbound.SqsMessageDrivenChannelAdapter.access0(SqsMessageDrivenChannelAdapter.java:60)
at org.springframework.integration.aws.inbound.SqsMessageDrivenChannelAdapter$IntegrationQueueMessageHandler.handleMessageInternal(SqsMessageDrivenChannelAdapter.java:194)
at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMessage(AbstractMethodMessageHandler.java:454)
at org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer.executeMessage(SimpleMessageListenerContainer.java:228)
at org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$MessageExecutor.run(SimpleMessageListenerContainer.java:418)
at org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$SignalExecutingRunnable.run(SimpleMessageListenerContainer.java:310)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:831)
Caused by: javax.validation.ConstraintViolationException: XXXXXX
at validator.validate(Validator.java:33)
@Bean
public IntegrationFlow validationAndProcessingFlow() {
return IntegrationFlows.from(objectChannel())
.log(
Level.INFO,
message -> "Object Channel Received Message : " + message.getPayload())
.transform(Transformers.fromJson(Dto.class))
.handle((payload, header) -> validator.validate((Dto) payload))
.handle((payload, header) -> mapper.toMyObject(Dto) payload))
.handle((payload, header) -> service.process((MyObject) payload))
.handle(
(payload, header) ->
adapter
.save((MyObject) payload)
.as(create(transactionManager)::transactional))
.log(
Level.INFO,
message -> "Persisted Message : " + message.getPayload())
.get();
}
@Bean
public MessageProducer createSqsMessageDrivenChannelAdapter(
@Value("${queue.inbound.name}") final String queueName,
@Value("${queue.inbound.visibility-timeout}") final Integer visibilityTimeout,
@Value("${queue.inbound.wait-timeout}") final Integer waitTimeout,
@Value("${queue.inbound.max-number-of-messages}")
final Integer maxNumberMessages) {
SqsMessageDrivenChannelAdapter adapter =
new SqsMessageDrivenChannelAdapter(this.amazonSqs, queueName);
adapter.setVisibilityTimeout(visibilityTimeout);
adapter.setWaitTimeOut(waitTimeout);
adapter.setAutoStartup(true);
adapter.setMaxNumberOfMessages(maxNumberMessages);
adapter.setErrorChannel(flowErrorChannel());
adapter.setOutputChannel(objectChannel());
adapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.NO_REDRIVE);
return adapter;
}
@Bean
public IntegrationFlow errorProcessingFlow() {
return IntegrationFlows.from(flowErrorChannel())
.log(Level.ERROR)
.handle(
m -> {
throw (RuntimeException) (m.getPayload());
})
.get();
}
@Bean
public MessageChannel objectChannel() {
return MessageChannels.flux().get();
}
@Bean
public MessageChannel flowErrorChannel() {
return MessageChannels.flux().get();
}
该行为是预期的:只要我们切换到异步处理,我们就会在消息源中丢失 try..catch
功能。
请详细了解 Reactor 中的错误处理:https://projectreactor.io/docs/core/release/reference/#error.handling。
在我们为 SQS 提出一些反应性解决方案之前,除非您将该频道转回直接频道,否则无法解决您的问题。
我有一个从 SQS 读取的部分反应流,执行一些逻辑,保存到数据库 (R2DBC)。该流在反应通道上运行,该通道是 SqsMessageDrivenChannelAdapter
.
问题:
handle
方法 (.handle((payload, header) -> validator.validate((Dto) payload))
) 中抛出的异常不会到达 flowErrorChannel
。 errorProcessingFlow
未触发,我需要 errorProcessingFlow 记录并将异常抛出到 SimpleMessageListenerContainer
。
如果我将 objectChannel
和 flowErrorChannel
从 flux
更改为 direct
,效果与预期相同,但对于 Flux 通道则不然。对于通量通道,异常甚至不会传播到 SimpleMessageListenerContainer
,因为它不会根据 SQS 队列配置触发重新驱动,仅记录来自 handle
的异常。
这里是异常和流程配置:
2021-05-28 12:40:34.772 ERROR 59097 --- [enerContainer-2] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [ServiceActivator for [*********]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:450)
at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:324)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:267)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:231)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:140)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:88)
at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:37)
at org.springframework.integration.endpoint.ReactiveStreamsConsumer$SubscriberDecorator.hookOnNext(ReactiveStreamsConsumer.java:280)
at org.springframework.integration.endpoint.ReactiveStreamsConsumer$SubscriberDecorator.hookOnNext(ReactiveStreamsConsumer.java:261)
at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
at reactor.core.publisher.FluxRefCount$RefCountInner.onNext(FluxRefCount.java:199)
at reactor.core.publisher.FluxPublish$PublishSubscriber.drain(FluxPublish.java:477)
at reactor.core.publisher.FluxPublish$PublishSubscriber.onNext(FluxPublish.java:268)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:130)
at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:491)
at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:299)
at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:97)
at org.springframework.integration.channel.FluxMessageChannel.tryEmitMessage(FluxMessageChannel.java:79)
at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:68)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
at org.springframework.integration.aws.inbound.SqsMessageDrivenChannelAdapter.access0(SqsMessageDrivenChannelAdapter.java:60)
at org.springframework.integration.aws.inbound.SqsMessageDrivenChannelAdapter$IntegrationQueueMessageHandler.handleMessageInternal(SqsMessageDrivenChannelAdapter.java:194)
at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMessage(AbstractMethodMessageHandler.java:454)
at org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer.executeMessage(SimpleMessageListenerContainer.java:228)
at org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$MessageExecutor.run(SimpleMessageListenerContainer.java:418)
at org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$SignalExecutingRunnable.run(SimpleMessageListenerContainer.java:310)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:831)
Caused by: javax.validation.ConstraintViolationException: XXXXXX
at validator.validate(Validator.java:33)
@Bean
public IntegrationFlow validationAndProcessingFlow() {
return IntegrationFlows.from(objectChannel())
.log(
Level.INFO,
message -> "Object Channel Received Message : " + message.getPayload())
.transform(Transformers.fromJson(Dto.class))
.handle((payload, header) -> validator.validate((Dto) payload))
.handle((payload, header) -> mapper.toMyObject(Dto) payload))
.handle((payload, header) -> service.process((MyObject) payload))
.handle(
(payload, header) ->
adapter
.save((MyObject) payload)
.as(create(transactionManager)::transactional))
.log(
Level.INFO,
message -> "Persisted Message : " + message.getPayload())
.get();
}
@Bean
public MessageProducer createSqsMessageDrivenChannelAdapter(
@Value("${queue.inbound.name}") final String queueName,
@Value("${queue.inbound.visibility-timeout}") final Integer visibilityTimeout,
@Value("${queue.inbound.wait-timeout}") final Integer waitTimeout,
@Value("${queue.inbound.max-number-of-messages}")
final Integer maxNumberMessages) {
SqsMessageDrivenChannelAdapter adapter =
new SqsMessageDrivenChannelAdapter(this.amazonSqs, queueName);
adapter.setVisibilityTimeout(visibilityTimeout);
adapter.setWaitTimeOut(waitTimeout);
adapter.setAutoStartup(true);
adapter.setMaxNumberOfMessages(maxNumberMessages);
adapter.setErrorChannel(flowErrorChannel());
adapter.setOutputChannel(objectChannel());
adapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.NO_REDRIVE);
return adapter;
}
@Bean
public IntegrationFlow errorProcessingFlow() {
return IntegrationFlows.from(flowErrorChannel())
.log(Level.ERROR)
.handle(
m -> {
throw (RuntimeException) (m.getPayload());
})
.get();
}
@Bean
public MessageChannel objectChannel() {
return MessageChannels.flux().get();
}
@Bean
public MessageChannel flowErrorChannel() {
return MessageChannels.flux().get();
}
该行为是预期的:只要我们切换到异步处理,我们就会在消息源中丢失 try..catch
功能。
请详细了解 Reactor 中的错误处理:https://projectreactor.io/docs/core/release/reference/#error.handling。
在我们为 SQS 提出一些反应性解决方案之前,除非您将该频道转回直接频道,否则无法解决您的问题。