Spring cloud stream rabbit binder : 向 DLQ 发送批处理消息时出错
Spring cloud stream rabbit binder : Error sending batch messages to DLQ
我正在尝试使用 rabbit binder 配置 spring-cloud-stream 应用程序
spring cloud stream : 3.1.4
spring cloud stream rabbit binder : 3.1.4
spring version : 2.5.5
下面是我的配置:
spring:
cloud:
stream:
rabbit:
bindings:
listen-in-0:
consumer:
queueNameGroupOnly: true
enable-batching: true
batch-size: 100
receive-timeout: 500
transacted: true
listen-out-0:
producer:
queueNameGroupOnly: true
transacted: true
batchingEnabled: false
enable-batching: false
bindings:
listen-in-0:
destination: test.request
group: test.request
consumer:
batch-mode: true
requiredGroups: test.request
maxAttempts: 1
listen-out-0:
destination: test.response
group: test.response
producer:
requiredGroups: test.response
我的 java 消费者代码:
@Bean
public Function<Message<List<Request>>, List<Message<Response>>> listen() {
...
}
}
如果没有错误发生,一切正常。但是当我模拟一个异常时,我在异常下得到了这里:
2021-10-27 17:08:31,997 [rs-worker] [test.request-1] WARN o.s.a.r.l.ConditionalRejectingErrorHandler$DefaultExceptionStrategy [] - Fatal message conversion error; message rejected; it will be dropped or routed to a dead letter exchange, if so configured: (Body:'[B@7891e2d2(byte[18])' MessageProperties [headers={tenant=tenant1, request.id=75c328f6-42a9-4682-952e-46d088a5c09e}, contentType=application/octet-stream, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=test.request, receivedRoutingKey=#, deliveryTag=1, consumerTag=amq.ctag-S0u7Mj4EwX13Kzj_CgzGVg, consumerQueue=test.request])
2021-10-27 17:08:31,997 [rs-worker] [test.request-1] ERROR o.s.a.r.l.SimpleMessageListenerContainer [] - Execution of Rabbit message listener failed, and the error handler threw an exception
org.springframework.amqp.AmqpRejectAndDontRequeueException: Error Handler converted exception to fatal
at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler.handleError(ConditionalRejectingErrorHandler.java:146)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:1460)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:1744)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1519)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.executeWithList(SimpleMessageListenerContainer.java:1028)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1017)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:914)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access00(SimpleMessageListenerContainer.java:83)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1289)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1195)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1767)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1660)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1575)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1563)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1558)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1498)
... 7 common frames omitted
Caused by: org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'test.request.errors'; nested exception is java.lang.ClassCastException: java.util.ArrayList cannot be cast to org.springframework.amqp.core.Message
at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:167)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:339)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendErrorMessageIfNecessary(MessageProducerSupport.java:254)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:211)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access00(AmqpInboundChannelAdapter.java:69)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$BatchListener.onMessageBatch(AmqpInboundChannelAdapter.java:481)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1652)
... 11 common frames omitted
Caused by: java.lang.ClassCastException: java.util.ArrayList cannot be cast to org.springframework.amqp.core.Message
at org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder.handleMessage(RabbitMessageChannelBinder.java:663)
at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:222)
at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:178)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
... 21 common frames omitted
当 rabbit binder 尝试将消息发送回 DLQ 时会引发此错误。
确实,ErrorMessage 负载包含一个 List<Message<?>>
或 spring 云流兔子,期望 Message<List<?>>
.
是否有解决方法,或者我在配置或使用库时是否出错?
谢谢。
编辑 1:
好的,所以我不能将 DLQ 用于 spring-cloud-stream-rabbit-binder 的批处理消息侦听器。
我必须自己处理,所以我如何确保构成我的监听器的所有函数之间的事务:
但是为了自己处理异常,我必须处理构成我的侦听器的所有函数中的执行。我有一个由具有此配置的多功能组成的侦听器:
spring:
cloud:
function:
definition: heartbeat;listen|process|send
在 heartbeat
函数中处理事务没问题,但是我如何确保构成 listen|process|send
的所有函数之间的事务?
实际上,当我在 process
函数中使用 StreamBridge
发送消息时,我遇到了同样的问题。如果发送函数失败,我不想提交在“处理”函数中发送的消息。
这是我的 java 过程函数代码:
@Configuration
public class WorkerProcessor {
@Bean
public Function<List<Request>, List<Request>> process(Service service) {
return (requests) -> service.run(requests);
}
}
@Component
@Transactional
public class Service {
StreamBridge bridge;
IWorker worker;
public Service(StreamBridge bridge, IWorker worker) {
this.bridge = bridge;
this.worker = worker;
}
@Transactional
public List<Request> run(List<Request> requests) {
requests.forEach(request -> {
worker.process(request, bridge);
});
return requests;
}
}
目前不支持批量侦听器在活页夹中重新发布死信。将 republishToDlq
设置为 false。
很难说它是否应该被支持(即整个批次将被发送,即使一些记录被成功处理),但它不应该抛出 ClassCastException
.
https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit/issues/348
对于批处理侦听器,最好在侦听器本身中处理异常。
我正在尝试使用 rabbit binder 配置 spring-cloud-stream 应用程序
spring cloud stream : 3.1.4
spring cloud stream rabbit binder : 3.1.4
spring version : 2.5.5
下面是我的配置:
spring:
cloud:
stream:
rabbit:
bindings:
listen-in-0:
consumer:
queueNameGroupOnly: true
enable-batching: true
batch-size: 100
receive-timeout: 500
transacted: true
listen-out-0:
producer:
queueNameGroupOnly: true
transacted: true
batchingEnabled: false
enable-batching: false
bindings:
listen-in-0:
destination: test.request
group: test.request
consumer:
batch-mode: true
requiredGroups: test.request
maxAttempts: 1
listen-out-0:
destination: test.response
group: test.response
producer:
requiredGroups: test.response
我的 java 消费者代码:
@Bean
public Function<Message<List<Request>>, List<Message<Response>>> listen() {
...
}
}
如果没有错误发生,一切正常。但是当我模拟一个异常时,我在异常下得到了这里:
2021-10-27 17:08:31,997 [rs-worker] [test.request-1] WARN o.s.a.r.l.ConditionalRejectingErrorHandler$DefaultExceptionStrategy [] - Fatal message conversion error; message rejected; it will be dropped or routed to a dead letter exchange, if so configured: (Body:'[B@7891e2d2(byte[18])' MessageProperties [headers={tenant=tenant1, request.id=75c328f6-42a9-4682-952e-46d088a5c09e}, contentType=application/octet-stream, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=test.request, receivedRoutingKey=#, deliveryTag=1, consumerTag=amq.ctag-S0u7Mj4EwX13Kzj_CgzGVg, consumerQueue=test.request])
2021-10-27 17:08:31,997 [rs-worker] [test.request-1] ERROR o.s.a.r.l.SimpleMessageListenerContainer [] - Execution of Rabbit message listener failed, and the error handler threw an exception
org.springframework.amqp.AmqpRejectAndDontRequeueException: Error Handler converted exception to fatal
at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler.handleError(ConditionalRejectingErrorHandler.java:146)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:1460)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:1744)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1519)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.executeWithList(SimpleMessageListenerContainer.java:1028)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1017)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:914)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access00(SimpleMessageListenerContainer.java:83)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1289)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1195)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1767)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1660)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1575)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1563)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1558)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1498)
... 7 common frames omitted
Caused by: org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'test.request.errors'; nested exception is java.lang.ClassCastException: java.util.ArrayList cannot be cast to org.springframework.amqp.core.Message
at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:167)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:339)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendErrorMessageIfNecessary(MessageProducerSupport.java:254)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:211)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access00(AmqpInboundChannelAdapter.java:69)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$BatchListener.onMessageBatch(AmqpInboundChannelAdapter.java:481)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1652)
... 11 common frames omitted
Caused by: java.lang.ClassCastException: java.util.ArrayList cannot be cast to org.springframework.amqp.core.Message
at org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder.handleMessage(RabbitMessageChannelBinder.java:663)
at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:222)
at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:178)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
... 21 common frames omitted
当 rabbit binder 尝试将消息发送回 DLQ 时会引发此错误。
确实,ErrorMessage 负载包含一个 List<Message<?>>
或 spring 云流兔子,期望 Message<List<?>>
.
是否有解决方法,或者我在配置或使用库时是否出错?
谢谢。
编辑 1:
好的,所以我不能将 DLQ 用于 spring-cloud-stream-rabbit-binder 的批处理消息侦听器。
我必须自己处理,所以我如何确保构成我的监听器的所有函数之间的事务:
但是为了自己处理异常,我必须处理构成我的侦听器的所有函数中的执行。我有一个由具有此配置的多功能组成的侦听器:
spring:
cloud:
function:
definition: heartbeat;listen|process|send
在 heartbeat
函数中处理事务没问题,但是我如何确保构成 listen|process|send
的所有函数之间的事务?
实际上,当我在 process
函数中使用 StreamBridge
发送消息时,我遇到了同样的问题。如果发送函数失败,我不想提交在“处理”函数中发送的消息。
这是我的 java 过程函数代码:
@Configuration
public class WorkerProcessor {
@Bean
public Function<List<Request>, List<Request>> process(Service service) {
return (requests) -> service.run(requests);
}
}
@Component
@Transactional
public class Service {
StreamBridge bridge;
IWorker worker;
public Service(StreamBridge bridge, IWorker worker) {
this.bridge = bridge;
this.worker = worker;
}
@Transactional
public List<Request> run(List<Request> requests) {
requests.forEach(request -> {
worker.process(request, bridge);
});
return requests;
}
}
目前不支持批量侦听器在活页夹中重新发布死信。将 republishToDlq
设置为 false。
很难说它是否应该被支持(即整个批次将被发送,即使一些记录被成功处理),但它不应该抛出 ClassCastException
.
https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit/issues/348
对于批处理侦听器,最好在侦听器本身中处理异常。