使用通量通道的反应式 DSL 错误处理

Reactive DSL Error Handling with Flux Channels

我有一个从 SQS 读取的部分反应流,执行一些逻辑,保存到数据库 (R2DBC)。该流在反应通道上运行,该通道是 SqsMessageDrivenChannelAdapter.

的入站通道

问题:

handle 方法 (.handle((payload, header) -> validator.validate((Dto) payload)) ) 中抛出的异常不会到达 flowErrorChannelerrorProcessingFlow 未触发,我需要 errorProcessingFlow 记录并将异常抛出到 SimpleMessageListenerContainer

如果我将 objectChannelflowErrorChannelflux 更改为 direct,效果与预期相同,但对于 Flux 通道则不然。对于通量通道,异常甚至不会传播到 SimpleMessageListenerContainer,因为它不会根据 SQS 队列配置触发重新驱动,仅记录来自 handle 的异常。

这里是异常和流程配置:

2021-05-28 12:40:34.772 ERROR 59097 --- [enerContainer-2] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [ServiceActivator for [*********]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:450)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:324)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:267)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:231)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:140)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:88)
    at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:37)
    at org.springframework.integration.endpoint.ReactiveStreamsConsumer$SubscriberDecorator.hookOnNext(ReactiveStreamsConsumer.java:280)
    at org.springframework.integration.endpoint.ReactiveStreamsConsumer$SubscriberDecorator.hookOnNext(ReactiveStreamsConsumer.java:261)
    at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
    at reactor.core.publisher.FluxRefCount$RefCountInner.onNext(FluxRefCount.java:199)
    at reactor.core.publisher.FluxPublish$PublishSubscriber.drain(FluxPublish.java:477)
    at reactor.core.publisher.FluxPublish$PublishSubscriber.onNext(FluxPublish.java:268)
    at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:130)
    at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:491)
    at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:299)
    at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:97)
    at org.springframework.integration.channel.FluxMessageChannel.tryEmitMessage(FluxMessageChannel.java:79)
    at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:68)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
    at org.springframework.integration.aws.inbound.SqsMessageDrivenChannelAdapter.access0(SqsMessageDrivenChannelAdapter.java:60)
    at org.springframework.integration.aws.inbound.SqsMessageDrivenChannelAdapter$IntegrationQueueMessageHandler.handleMessageInternal(SqsMessageDrivenChannelAdapter.java:194)
    at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMessage(AbstractMethodMessageHandler.java:454)
    at org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer.executeMessage(SimpleMessageListenerContainer.java:228)
    at org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$MessageExecutor.run(SimpleMessageListenerContainer.java:418)
    at org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$SignalExecutingRunnable.run(SimpleMessageListenerContainer.java:310)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
    at java.base/java.lang.Thread.run(Thread.java:831)
Caused by: javax.validation.ConstraintViolationException: XXXXXX
    at validator.validate(Validator.java:33)


 @Bean
    public IntegrationFlow validationAndProcessingFlow() {
        return IntegrationFlows.from(objectChannel())
                .log(
                        Level.INFO,
                        message -> "Object Channel Received Message : " + message.getPayload())
                .transform(Transformers.fromJson(Dto.class))
                .handle((payload, header) -> validator.validate((Dto) payload))
                .handle((payload, header) -> mapper.toMyObject(Dto) payload))
                .handle((payload, header) -> service.process((MyObject) payload))
                .handle(
                        (payload, header) ->
                                adapter
                                        .save((MyObject) payload)
                                        .as(create(transactionManager)::transactional))
                .log(
                        Level.INFO,
                        message -> "Persisted Message : " + message.getPayload())
                .get();
    }


    @Bean
    public MessageProducer createSqsMessageDrivenChannelAdapter(
            @Value("${queue.inbound.name}") final String queueName,
            @Value("${queue.inbound.visibility-timeout}") final Integer visibilityTimeout,
            @Value("${queue.inbound.wait-timeout}") final Integer waitTimeout,
            @Value("${queue.inbound.max-number-of-messages}")
                    final Integer maxNumberMessages) {
        SqsMessageDrivenChannelAdapter adapter =
                new SqsMessageDrivenChannelAdapter(this.amazonSqs, queueName);
        adapter.setVisibilityTimeout(visibilityTimeout);
        adapter.setWaitTimeOut(waitTimeout);
        adapter.setAutoStartup(true);
        adapter.setMaxNumberOfMessages(maxNumberMessages);
        adapter.setErrorChannel(flowErrorChannel());
        adapter.setOutputChannel(objectChannel());
        adapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.NO_REDRIVE);
        return adapter;
    }

    @Bean
    public IntegrationFlow errorProcessingFlow() {

    return IntegrationFlows.from(flowErrorChannel())
        .log(Level.ERROR)
        .handle(
            m -> {
              throw (RuntimeException) (m.getPayload());
            })
        .get();
    }

    @Bean
    public MessageChannel objectChannel() {
        return MessageChannels.flux().get();
    }

    @Bean
    public MessageChannel flowErrorChannel() {
        return MessageChannels.flux().get();
    }

该行为是预期的:只要我们切换到异步处理,我们就会在消息源中丢失 try..catch 功能。

请详细了解 Reactor 中的错误处理:https://projectreactor.io/docs/core/release/reference/#error.handling

在我们为 SQS 提出一些反应性解决方案之前,除非您将该频道转回直接频道,否则无法解决您的问题。