在 AsyncRabbitTemplate 中处理回复消息 post
Reply Message post processing in AsyncRabbitTemplate
我 运行 在使用 AsyncRabbitTemplate 时遇到 GZip/GUnzip 消息处理的问题。
像这样的同步模板设置工作正常:
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jsonConverter());
rabbitTemplate.setReplyTimeout(config.getRabbitSendAndReceiveReplyTimeout());
rabbitTemplate.setReceiveTimeout(config.getRabbitSendAndReceiveReceiveTimeout());
rabbitTemplate.setAfterReceivePostProcessors(new GUnzipPostProcessor(true));
rabbitTemplate.setBeforePublishPostProcessors(new GZipPostProcessor(true));
return rabbitTemplate;
}
但是,当我像这样设置异步模板时:
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public AsyncRabbitTemplate rabbitTemplateAsync(final ConnectionFactory connectionFactory) {
final AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate(connectionFactory));
// need to manually start the reply listener container for some reason
asyncRabbitTemplate.start();
return asyncRabbitTemplate;
}
回复消息未正确解压缩,我收到此错误消息
Caused by: java.io.UnsupportedEncodingException: gzip:UTF-8
at java.lang.StringCoding.decode(Unknown Source) ~[?:1.8.0_192]
at java.lang.String.<init>(Unknown Source) ~[?:1.8.0_192]
at java.lang.String.<init>(Unknown Source) ~[?:1.8.0_192]
at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.convertBytesToObject(AbstractJackson2MessageConverter.java:235) ~[spring-amqp-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:199) ~[spring-amqp-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.amqp.rabbit.AsyncRabbitTemplate.onMessage(AsyncRabbitTemplate.java:576) ~[spring-rabbit-2.1.4.RELEASE.jar:2.1.4.RELEASE]
我试过为 AsyncRabbitTemplate 提供一个已配置的 DirectReplyToMessageListenerContainer,但没有帮助
final DirectReplyToMessageListenerContainer directReplyToMessageListenerContainer = new DirectReplyToMessageListenerContainer(
connectionFactory);
directReplyToMessageListenerContainer.setAfterReceivePostProcessors(new GUnzipPostProcessor(true));
final AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate(connectionFactory),
directReplyToMessageListenerContainer);
这只会导致此错误:
[ERROR] 2019-03-06 12:18:05.192 [AMQP Connection 172.17.3.6:5672] CachingConnectionFactory.log - Channel shutdown: channel error; protocol method: #method(reply-code=406, reply-text=PRECONDITION_FAILED - fast reply consumer does not exist, class-id=60, method-id=40)
请注意,我可以通过 spring-rabbit 项目的一个分支并将此构造函数添加到 AsyncRabbitTemplate 来让事情正常进行:
public IndigoAsyncRabbitTemplate(final RabbitTemplate template,
final DirectReplyToMessageListenerContainer directReplyToContainer) {
Assert.notNull(template, "'template' cannot be null");
this.template = template;
container = null;
replyAddress = null;
this.directReplyToContainer = directReplyToContainer;
directReplyToContainer.setMessageListener(this);
}
那么,这是否会对 spring rabbit 库进行增强才能开始工作?或者有没有办法让 GUnzip 在回复侦听器上工作而无需跳过太多环节?
对,这必须作为对框架的改进。在 AsyncRabbitTemplate
的情况下,我们只是错过了 afterReceivePostProcessors
的事实。我们可以重新配置内部 DirectReplyToMessageListenerContainer
以使用提供的 RabbitTemplate
.
中的 afterReceivePostProcessors
与此同时,您可以坚持定期 SimpleMessageListenerContainer
注射。
或者您可以尝试使用外部 DirectReplyToMessageListenerContainer
注入。
看到这位演员:
/**
* Construct an instance using the provided arguments. The first queue the container
* is configured to listen to will be used as the reply queue. Replies will be
* routed using the default exchange with that queue name as the routing key.
* @param template a {@link RabbitTemplate}
* @param container a {@link AbstractMessageListenerContainer}.
*/
public AsyncRabbitTemplate(RabbitTemplate template, AbstractMessageListenerContainer container) {
this(template, container, null);
}
关于此事的问题:https://github.com/spring-projects/spring-amqp/issues/920
我 运行 在使用 AsyncRabbitTemplate 时遇到 GZip/GUnzip 消息处理的问题。
像这样的同步模板设置工作正常:
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jsonConverter());
rabbitTemplate.setReplyTimeout(config.getRabbitSendAndReceiveReplyTimeout());
rabbitTemplate.setReceiveTimeout(config.getRabbitSendAndReceiveReceiveTimeout());
rabbitTemplate.setAfterReceivePostProcessors(new GUnzipPostProcessor(true));
rabbitTemplate.setBeforePublishPostProcessors(new GZipPostProcessor(true));
return rabbitTemplate;
}
但是,当我像这样设置异步模板时:
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public AsyncRabbitTemplate rabbitTemplateAsync(final ConnectionFactory connectionFactory) {
final AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate(connectionFactory));
// need to manually start the reply listener container for some reason
asyncRabbitTemplate.start();
return asyncRabbitTemplate;
}
回复消息未正确解压缩,我收到此错误消息
Caused by: java.io.UnsupportedEncodingException: gzip:UTF-8
at java.lang.StringCoding.decode(Unknown Source) ~[?:1.8.0_192]
at java.lang.String.<init>(Unknown Source) ~[?:1.8.0_192]
at java.lang.String.<init>(Unknown Source) ~[?:1.8.0_192]
at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.convertBytesToObject(AbstractJackson2MessageConverter.java:235) ~[spring-amqp-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:199) ~[spring-amqp-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.amqp.rabbit.AsyncRabbitTemplate.onMessage(AsyncRabbitTemplate.java:576) ~[spring-rabbit-2.1.4.RELEASE.jar:2.1.4.RELEASE]
我试过为 AsyncRabbitTemplate 提供一个已配置的 DirectReplyToMessageListenerContainer,但没有帮助
final DirectReplyToMessageListenerContainer directReplyToMessageListenerContainer = new DirectReplyToMessageListenerContainer(
connectionFactory);
directReplyToMessageListenerContainer.setAfterReceivePostProcessors(new GUnzipPostProcessor(true));
final AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate(connectionFactory),
directReplyToMessageListenerContainer);
这只会导致此错误:
[ERROR] 2019-03-06 12:18:05.192 [AMQP Connection 172.17.3.6:5672] CachingConnectionFactory.log - Channel shutdown: channel error; protocol method: #method(reply-code=406, reply-text=PRECONDITION_FAILED - fast reply consumer does not exist, class-id=60, method-id=40)
请注意,我可以通过 spring-rabbit 项目的一个分支并将此构造函数添加到 AsyncRabbitTemplate 来让事情正常进行:
public IndigoAsyncRabbitTemplate(final RabbitTemplate template,
final DirectReplyToMessageListenerContainer directReplyToContainer) {
Assert.notNull(template, "'template' cannot be null");
this.template = template;
container = null;
replyAddress = null;
this.directReplyToContainer = directReplyToContainer;
directReplyToContainer.setMessageListener(this);
}
那么,这是否会对 spring rabbit 库进行增强才能开始工作?或者有没有办法让 GUnzip 在回复侦听器上工作而无需跳过太多环节?
对,这必须作为对框架的改进。在 AsyncRabbitTemplate
的情况下,我们只是错过了 afterReceivePostProcessors
的事实。我们可以重新配置内部 DirectReplyToMessageListenerContainer
以使用提供的 RabbitTemplate
.
afterReceivePostProcessors
与此同时,您可以坚持定期 SimpleMessageListenerContainer
注射。
或者您可以尝试使用外部 DirectReplyToMessageListenerContainer
注入。
看到这位演员:
/**
* Construct an instance using the provided arguments. The first queue the container
* is configured to listen to will be used as the reply queue. Replies will be
* routed using the default exchange with that queue name as the routing key.
* @param template a {@link RabbitTemplate}
* @param container a {@link AbstractMessageListenerContainer}.
*/
public AsyncRabbitTemplate(RabbitTemplate template, AbstractMessageListenerContainer container) {
this(template, container, null);
}
关于此事的问题:https://github.com/spring-projects/spring-amqp/issues/920