Spring cloud stream rabbit binder : 向 DLQ 发送批处理消息时出错

Spring cloud stream rabbit binder : Error sending batch messages to DLQ

我正在尝试使用 rabbit binder 配置 spring-cloud-stream 应用程序

spring cloud stream : 3.1.4
spring cloud stream rabbit binder : 3.1.4
spring version : 2.5.5

下面是我的配置:

spring:
  cloud:
    stream:
      rabbit:
        bindings:
          listen-in-0:
            consumer:
              queueNameGroupOnly: true
              enable-batching: true
              batch-size: 100
              receive-timeout: 500
              transacted: true
          listen-out-0:
            producer:
              queueNameGroupOnly: true
              transacted: true
              batchingEnabled: false
              enable-batching: false
      bindings:
        listen-in-0:
          destination: test.request
          group: test.request
          consumer:
            batch-mode: true
            requiredGroups: test.request
            maxAttempts: 1
        listen-out-0:
          destination: test.response
          group:  test.response
          producer:
            requiredGroups:  test.response

我的 java 消费者代码:

@Bean
    public Function<Message<List<Request>>, List<Message<Response>>> listen() {
      ...
    }
}

如果没有错误发生,一切正常。但是当我模拟一个异常时,我在异常下得到了这里:

2021-10-27 17:08:31,997 [rs-worker] [test.request-1] WARN  o.s.a.r.l.ConditionalRejectingErrorHandler$DefaultExceptionStrategy []    - Fatal message conversion error; message rejected; it will be dropped or routed to a dead letter exchange, if so configured: (Body:'[B@7891e2d2(byte[18])' MessageProperties [headers={tenant=tenant1, request.id=75c328f6-42a9-4682-952e-46d088a5c09e}, contentType=application/octet-stream, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=test.request, receivedRoutingKey=#, deliveryTag=1, consumerTag=amq.ctag-S0u7Mj4EwX13Kzj_CgzGVg, consumerQueue=test.request])
2021-10-27 17:08:31,997 [rs-worker] [test.request-1] ERROR o.s.a.r.l.SimpleMessageListenerContainer []    - Execution of Rabbit message listener failed, and the error handler threw an exception
org.springframework.amqp.AmqpRejectAndDontRequeueException: Error Handler converted exception to fatal
    at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler.handleError(ConditionalRejectingErrorHandler.java:146)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:1460)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:1744)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1519)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.executeWithList(SimpleMessageListenerContainer.java:1028)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1017)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:914)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access00(SimpleMessageListenerContainer.java:83)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1289)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1195)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1767)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1660)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1575)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1563)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1558)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1498)
    ... 7 common frames omitted
Caused by: org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'test.request.errors'; nested exception is java.lang.ClassCastException: java.util.ArrayList cannot be cast to org.springframework.amqp.core.Message
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:167)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:339)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendErrorMessageIfNecessary(MessageProducerSupport.java:254)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:211)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access00(AmqpInboundChannelAdapter.java:69)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$BatchListener.onMessageBatch(AmqpInboundChannelAdapter.java:481)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1652)
    ... 11 common frames omitted
Caused by: java.lang.ClassCastException: java.util.ArrayList cannot be cast to org.springframework.amqp.core.Message
    at org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder.handleMessage(RabbitMessageChannelBinder.java:663)
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:222)
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:178)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    ... 21 common frames omitted

当 rabbit binder 尝试将消息发送回 DLQ 时会引发此错误。

确实,ErrorMessage 负载包含一个 List<Message<?>> 或 spring 云流兔子,期望 Message<List<?>>.

是否有解决方法,或者我在配置或使用库时是否出错?

谢谢。

编辑 1:

好的,所以我不能将 DLQ 用于 spring-cloud-stream-rabbit-binder 的批处理消息侦听器。

我必须自己处理,所以我如何确保构成我的监听器的所有函数之间的事务:

但是为了自己处理异常,我必须处理构成我的侦听器的所有函数中的执行。我有一个由具有此配置的多功能组成的侦听器:

spring:
  cloud:
    function:
      definition: heartbeat;listen|process|send

heartbeat 函数中处理事务没问题,但是我如何确保构成 listen|process|send 的所有函数之间的事务?

实际上,当我在 process 函数中使用 StreamBridge 发送消息时,我遇到了同样的问题。如果发送函数失败,我不想提交在“处理”函数中发送的消息。

这是我的 java 过程函数代码:


@Configuration
public class WorkerProcessor {

    @Bean
    public Function<List<Request>, List<Request>> process(Service service) {
        return (requests) -> service.run(requests);
    }
}
@Component
@Transactional
public class Service {
    StreamBridge bridge;
    IWorker worker;
    public Service(StreamBridge bridge, IWorker worker) {
        this.bridge = bridge;
        this.worker = worker;
    }
    @Transactional
    public List<Request> run(List<Request> requests) {
        requests.forEach(request -> {
            worker.process(request, bridge);
        });
        return requests;
    }
}

目前不支持批量侦听器在活页夹中重新发布死信。将 republishToDlq 设置为 false。

很难说它是否应该被支持(即整个批次将被发送,即使一些记录被成功处理),但它不应该抛出 ClassCastException.

https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit/issues/348

对于批处理侦听器,最好在侦听器本身中处理异常。