Spring AMQP、CorrelationId 和 GZipPostProcessor:UnsupportedEncodingException
Spring AMQP, CorrelationId and GZipPostProcessor: UnsupportedEncodingException
我有一个带有 Spring AMQP (1.7.12.RELEASE) 的项目。
如果我为 correlationId 字段 (etMessageProperties ().SetCorrelationId) 赋值并使用 GZipPostProcessor,则始终会出现以下错误:
"org.springframework.amqp.AmqpUnsupportedEncodingException: java.io.UnsupportedEncodingException: gzip"
要解决这个问题,似乎可以使用以下代码:
DefaultMessagePropertiesConverter messageConverter = new DefaultMessagePropertiesConverter();
messageConverter.setCorrelationIdAsString(DefaultMessagePropertiesConverter.CorrelationIdPolicy.STRING);
template.setMessagePropertiesConverter(messageConverter);
但我不知道在不使用 Spring AMQP 的客户端上实际使用它会有什么影响(如果到达我的消息有它,我会建立这个字段)。
我附上一个完整的代码示例:
@Configuration
public class SimpleProducerGZIP
{
static final String queueName = "spring-boot";
@Bean
public CachingConnectionFactory connectionFactory() {
com.rabbitmq.client.ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();
factory.setHost("localhost");
factory.setAutomaticRecoveryEnabled(false);
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(factory);
return connectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin() {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin ;
}
@Bean
Queue queue() {
Queue qr = new Queue(queueName, false);
qr.setAdminsThatShouldDeclare(amqpAdmin());
return qr;
}
@Bean
public RabbitTemplate rabbitTemplate()
{
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setEncoding("gzip");
template.setBeforePublishPostProcessors(new GZipPostProcessor());
// TODO :
DefaultMessagePropertiesConverter messageConverter = new DefaultMessagePropertiesConverter();
messageConverter.setCorrelationIdAsString(DefaultMessagePropertiesConverter.CorrelationIdPolicy.STRING);
template.setMessagePropertiesConverter(messageConverter);
return template;
}
public static void main(String[] args)
{
@SuppressWarnings("resource")
ApplicationContext context = new AnnotationConfigApplicationContext(SimpleProducerGZIP.class);
RabbitTemplate _rabbitTemplate = context.getBean(RabbitTemplate.class);
int contador = 0;
try {
while(true)
{
contador = contador + 1;
int _nContador = contador;
System.out.println("\nInicio envio : " + _nContador);
Object _o = new String(("New Message : " + contador));
try
{
_rabbitTemplate.convertAndSend(queueName, _o,
new MessagePostProcessor() {
@SuppressWarnings("deprecation")
@Override
public Message postProcessMessage(Message msg) throws AmqpException {
if(_nContador%2 == 0) {
System.out.println("\t--- msg.getMessageProperties().setCorrelationId ");
msg.getMessageProperties().setCorrelationId("NewCorrelation".getBytes(StandardCharsets.UTF_8));
}
return msg;
}
}
);
System.out.println("\tOK");
}catch (Exception e) {
System.err.println("\t\tError en envio : " + contador + " - " + e.getMessage());
}
System.out.println("Fin envio : " + contador);
Thread.sleep(500);
}
}catch (Exception e) {
System.err.println("Exception : " + e.getMessage());
}
}
}
问题是,如果我更改 rabbitTemplate 的配置以使错误不会发生,它是否会对使用 Spring AMQP 或其他替代方案的客户端产生影响?
--- 编辑(2019 年 3 月 28 日)
这是带有代码的完整堆栈跟踪:
org.springframework.amqp.AmqpUnsupportedEncodingException: java.io.UnsupportedEncodingException: gzip
at org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter.fromMessageProperties(DefaultMessagePropertiesConverter.java:211)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSend(RabbitTemplate.java:1531)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doInRabbit(RabbitTemplate.java:716)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1455)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1411)
at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:712)
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:813)
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:791)
at es.jab.example.SimpleProducerGZIP.main(SimpleProducerGZIP.java:79)
Caused by: java.io.UnsupportedEncodingException: gzip
at java.lang.StringCoding.decode(Unknown Source)
at java.lang.String.<init>(Unknown Source)
at java.lang.String.<init>(Unknown Source)
at org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter.fromMessageProperties(DefaultMessagePropertiesConverter.java:208)
... 8 more
我有兴趣查看完整的堆栈跟踪以获取有关该问题的更多信息。
此代码是从 byte[]
相关 ID 到 String
转换的一部分。这是避免 byte[]/String/byte[]
转换所必需的。
当策略为 String
时,您应该使用 correlationIdString
属性 而不是 correlationId
。否则,correlationId
将不会映射到出站消息中(在这种情况下我们不会查看 correlationId
)。对于入站消息,它控制填充哪个 属性。
在 2.0 及更高版本中,correlationId
现在是 String
而不是 byte[]
,因此不再需要此设置。
编辑
现在我看到了堆栈跟踪,这...
template.setEncoding("gzip");
...错了。
/**
* The encoding to use when inter-converting between byte arrays and Strings in message properties.
*
* @param encoding the encoding to set
*/
public void setEncoding(String encoding) {
this.encoding = encoding;
}
没有 gzip
这样的 Charset
。这个属性与消息内容无关,只是在转换byte[]
to/fromString
时使用而已。默认为UTF-8
。
我有一个带有 Spring AMQP (1.7.12.RELEASE) 的项目。 如果我为 correlationId 字段 (etMessageProperties ().SetCorrelationId) 赋值并使用 GZipPostProcessor,则始终会出现以下错误:
"org.springframework.amqp.AmqpUnsupportedEncodingException: java.io.UnsupportedEncodingException: gzip"
要解决这个问题,似乎可以使用以下代码:
DefaultMessagePropertiesConverter messageConverter = new DefaultMessagePropertiesConverter();
messageConverter.setCorrelationIdAsString(DefaultMessagePropertiesConverter.CorrelationIdPolicy.STRING);
template.setMessagePropertiesConverter(messageConverter);
但我不知道在不使用 Spring AMQP 的客户端上实际使用它会有什么影响(如果到达我的消息有它,我会建立这个字段)。 我附上一个完整的代码示例:
@Configuration
public class SimpleProducerGZIP
{
static final String queueName = "spring-boot";
@Bean
public CachingConnectionFactory connectionFactory() {
com.rabbitmq.client.ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();
factory.setHost("localhost");
factory.setAutomaticRecoveryEnabled(false);
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(factory);
return connectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin() {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin ;
}
@Bean
Queue queue() {
Queue qr = new Queue(queueName, false);
qr.setAdminsThatShouldDeclare(amqpAdmin());
return qr;
}
@Bean
public RabbitTemplate rabbitTemplate()
{
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setEncoding("gzip");
template.setBeforePublishPostProcessors(new GZipPostProcessor());
// TODO :
DefaultMessagePropertiesConverter messageConverter = new DefaultMessagePropertiesConverter();
messageConverter.setCorrelationIdAsString(DefaultMessagePropertiesConverter.CorrelationIdPolicy.STRING);
template.setMessagePropertiesConverter(messageConverter);
return template;
}
public static void main(String[] args)
{
@SuppressWarnings("resource")
ApplicationContext context = new AnnotationConfigApplicationContext(SimpleProducerGZIP.class);
RabbitTemplate _rabbitTemplate = context.getBean(RabbitTemplate.class);
int contador = 0;
try {
while(true)
{
contador = contador + 1;
int _nContador = contador;
System.out.println("\nInicio envio : " + _nContador);
Object _o = new String(("New Message : " + contador));
try
{
_rabbitTemplate.convertAndSend(queueName, _o,
new MessagePostProcessor() {
@SuppressWarnings("deprecation")
@Override
public Message postProcessMessage(Message msg) throws AmqpException {
if(_nContador%2 == 0) {
System.out.println("\t--- msg.getMessageProperties().setCorrelationId ");
msg.getMessageProperties().setCorrelationId("NewCorrelation".getBytes(StandardCharsets.UTF_8));
}
return msg;
}
}
);
System.out.println("\tOK");
}catch (Exception e) {
System.err.println("\t\tError en envio : " + contador + " - " + e.getMessage());
}
System.out.println("Fin envio : " + contador);
Thread.sleep(500);
}
}catch (Exception e) {
System.err.println("Exception : " + e.getMessage());
}
}
}
问题是,如果我更改 rabbitTemplate 的配置以使错误不会发生,它是否会对使用 Spring AMQP 或其他替代方案的客户端产生影响?
--- 编辑(2019 年 3 月 28 日) 这是带有代码的完整堆栈跟踪:
org.springframework.amqp.AmqpUnsupportedEncodingException: java.io.UnsupportedEncodingException: gzip
at org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter.fromMessageProperties(DefaultMessagePropertiesConverter.java:211)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSend(RabbitTemplate.java:1531)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doInRabbit(RabbitTemplate.java:716)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1455)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1411)
at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:712)
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:813)
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:791)
at es.jab.example.SimpleProducerGZIP.main(SimpleProducerGZIP.java:79)
Caused by: java.io.UnsupportedEncodingException: gzip
at java.lang.StringCoding.decode(Unknown Source)
at java.lang.String.<init>(Unknown Source)
at java.lang.String.<init>(Unknown Source)
at org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter.fromMessageProperties(DefaultMessagePropertiesConverter.java:208)
... 8 more
我有兴趣查看完整的堆栈跟踪以获取有关该问题的更多信息。
此代码是从 byte[]
相关 ID 到 String
转换的一部分。这是避免 byte[]/String/byte[]
转换所必需的。
当策略为 String
时,您应该使用 correlationIdString
属性 而不是 correlationId
。否则,correlationId
将不会映射到出站消息中(在这种情况下我们不会查看 correlationId
)。对于入站消息,它控制填充哪个 属性。
在 2.0 及更高版本中,correlationId
现在是 String
而不是 byte[]
,因此不再需要此设置。
编辑
现在我看到了堆栈跟踪,这...
template.setEncoding("gzip");
...错了。
/**
* The encoding to use when inter-converting between byte arrays and Strings in message properties.
*
* @param encoding the encoding to set
*/
public void setEncoding(String encoding) {
this.encoding = encoding;
}
没有 gzip
这样的 Charset
。这个属性与消息内容无关,只是在转换byte[]
to/fromString
时使用而已。默认为UTF-8
。