spring amqp 中的非分批消息
Non de-batching messages in spring amqp
我正在使用 BatchingRabbitTemplate
将消息批量发送到 amqp 端点。现在,在另一个接收端,我可以使用 @RabbitListener
来接收消息,但我的问题是消息会自动取消分批,所以我不能使用 @RabbitHandler public void receive (List<SomeObject> so)
。除了我这样做之外,有没有更简单的非去批处理消息的方法:
@RabbitListener(..., containerFactory = "nonDeBatchingContainerFactory")
@Bean
public RabbitListenerContainerFactory nonDeBatchingContainerFactory(){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setDeBatchingEnabled(false);
factory.setMessageConverter(jackson2JsonMessageConverter());
factory.setAfterReceivePostProcessors(new NonDeBatchingMessagePostProcessor(jackson2JsonMessageConverter()));
return factory;
}
然后实施此 post 处理器(即或多或少地复制现有代码以进行反批处理)。
public class NonDeBatchingMessagePostProcessor implements MessagePostProcessor {
private MessageConverter payloadConverter;
public NonDeBatchingMessagePostProcessor(MessageConverter payloadConverter) {
this.payloadConverter = payloadConverter;
}
@Override
public Message postProcessMessage(Message message) throws AmqpException {
Object batchFormat = message.getMessageProperties().getHeaders().get(MessageProperties.SPRING_BATCH_FORMAT);
if (MessageProperties.BATCH_FORMAT_LENGTH_HEADER4.equals(batchFormat)) {
List<? super Object> aggregatedObjects = new ArrayList<>();
ByteBuffer byteBuffer = ByteBuffer.wrap(message.getBody());
MessageProperties messageProperties = message.getMessageProperties();
String singleObjectTypeId = messageProperties.getHeaders().get(DEFAULT_CLASSID_FIELD_NAME).toString();
messageProperties.getHeaders().remove(MessageProperties.SPRING_BATCH_FORMAT);
while (byteBuffer.hasRemaining()) {
int length = byteBuffer.getInt();
if (length < 0 || length > byteBuffer.remaining()) {
throw new ListenerExecutionFailedException("Bad batched message received",
new MessageConversionException("Insufficient batch data at offset " + byteBuffer.position()),
message);
}
byte[] body = new byte[length];
byteBuffer.get(body);
messageProperties.setContentLength(length);
// Caveat - shared MessageProperties.
Message fragment = new Message(body, messageProperties);
Object singleObject = this.payloadConverter.fromMessage(fragment);
aggregatedObjects.add(singleObject);
}
Message aggregatedMessages = this.payloadConverter.toMessage(aggregatedObjects, messageProperties);
aggregatedMessages.getMessageProperties().getHeaders().put(DEFAULT_CONTENT_CLASSID_FIELD_NAME, singleObjectTypeId);
return aggregatedMessages;
}
return null;
}
}
我需要这个用例,以便在兔子上批量接收所有消息,然后在弹性搜索中进行批量索引。谢谢。
在生产应用程序级别进行批处理(发送 List<SomeObject>
)可能比使用批处理模板更容易一些。那么在消费者方面你根本不需要任何东西。
我正在使用 BatchingRabbitTemplate
将消息批量发送到 amqp 端点。现在,在另一个接收端,我可以使用 @RabbitListener
来接收消息,但我的问题是消息会自动取消分批,所以我不能使用 @RabbitHandler public void receive (List<SomeObject> so)
。除了我这样做之外,有没有更简单的非去批处理消息的方法:
@RabbitListener(..., containerFactory = "nonDeBatchingContainerFactory")
@Bean
public RabbitListenerContainerFactory nonDeBatchingContainerFactory(){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setDeBatchingEnabled(false);
factory.setMessageConverter(jackson2JsonMessageConverter());
factory.setAfterReceivePostProcessors(new NonDeBatchingMessagePostProcessor(jackson2JsonMessageConverter()));
return factory;
}
然后实施此 post 处理器(即或多或少地复制现有代码以进行反批处理)。
public class NonDeBatchingMessagePostProcessor implements MessagePostProcessor {
private MessageConverter payloadConverter;
public NonDeBatchingMessagePostProcessor(MessageConverter payloadConverter) {
this.payloadConverter = payloadConverter;
}
@Override
public Message postProcessMessage(Message message) throws AmqpException {
Object batchFormat = message.getMessageProperties().getHeaders().get(MessageProperties.SPRING_BATCH_FORMAT);
if (MessageProperties.BATCH_FORMAT_LENGTH_HEADER4.equals(batchFormat)) {
List<? super Object> aggregatedObjects = new ArrayList<>();
ByteBuffer byteBuffer = ByteBuffer.wrap(message.getBody());
MessageProperties messageProperties = message.getMessageProperties();
String singleObjectTypeId = messageProperties.getHeaders().get(DEFAULT_CLASSID_FIELD_NAME).toString();
messageProperties.getHeaders().remove(MessageProperties.SPRING_BATCH_FORMAT);
while (byteBuffer.hasRemaining()) {
int length = byteBuffer.getInt();
if (length < 0 || length > byteBuffer.remaining()) {
throw new ListenerExecutionFailedException("Bad batched message received",
new MessageConversionException("Insufficient batch data at offset " + byteBuffer.position()),
message);
}
byte[] body = new byte[length];
byteBuffer.get(body);
messageProperties.setContentLength(length);
// Caveat - shared MessageProperties.
Message fragment = new Message(body, messageProperties);
Object singleObject = this.payloadConverter.fromMessage(fragment);
aggregatedObjects.add(singleObject);
}
Message aggregatedMessages = this.payloadConverter.toMessage(aggregatedObjects, messageProperties);
aggregatedMessages.getMessageProperties().getHeaders().put(DEFAULT_CONTENT_CLASSID_FIELD_NAME, singleObjectTypeId);
return aggregatedMessages;
}
return null;
}
}
我需要这个用例,以便在兔子上批量接收所有消息,然后在弹性搜索中进行批量索引。谢谢。
在生产应用程序级别进行批处理(发送 List<SomeObject>
)可能比使用批处理模板更容易一些。那么在消费者方面你根本不需要任何东西。