Spring AMQP、CorrelationId 和 GZipPostProcessor:UnsupportedEncodingException

Spring AMQP, CorrelationId and GZipPostProcessor: UnsupportedEncodingException

我有一个带有 Spring AMQP (1.7.12.RELEASE) 的项目。 如果我为 correlationId 字段 (etMessageProperties ().SetCorrelationId) 赋值并使用 GZipPostProcessor,则始终会出现以下错误:

"org.springframework.amqp.AmqpUnsupportedEncodingException: java.io.UnsupportedEncodingException: gzip"

要解决这个问题,似乎可以使用以下代码:

DefaultMessagePropertiesConverter messageConverter = new DefaultMessagePropertiesConverter();
    messageConverter.setCorrelationIdAsString(DefaultMessagePropertiesConverter.CorrelationIdPolicy.STRING);
template.setMessagePropertiesConverter(messageConverter);

但我不知道在不使用 Spring AMQP 的客户端上实际使用它会有什么影响(如果到达我的消息有它,我会建立这个字段)。 我附上一个完整的代码示例:

@Configuration
public class SimpleProducerGZIP 
{
    static final String queueName = "spring-boot";

    @Bean
    public CachingConnectionFactory connectionFactory() {
        com.rabbitmq.client.ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();
        factory.setHost("localhost");
        factory.setAutomaticRecoveryEnabled(false);
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(factory);
        return connectionFactory;
    }

    @Bean
    public AmqpAdmin amqpAdmin() {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin ;
    }

    @Bean
    Queue queue() {
        Queue qr = new Queue(queueName, false);
        qr.setAdminsThatShouldDeclare(amqpAdmin());
        return qr;
    }

    @Bean 
    public RabbitTemplate rabbitTemplate() 
    {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setEncoding("gzip");
        template.setBeforePublishPostProcessors(new GZipPostProcessor());

         // TODO : 
        DefaultMessagePropertiesConverter messageConverter = new DefaultMessagePropertiesConverter();
        messageConverter.setCorrelationIdAsString(DefaultMessagePropertiesConverter.CorrelationIdPolicy.STRING);
        template.setMessagePropertiesConverter(messageConverter);

        return template;
    }

    public static void main(String[] args) 
    {
        @SuppressWarnings("resource")
        ApplicationContext context = new AnnotationConfigApplicationContext(SimpleProducerGZIP.class);
        RabbitTemplate _rabbitTemplate = context.getBean(RabbitTemplate.class);
        int contador = 0;
        try {
            while(true) 
            {
                contador = contador + 1;
                int _nContador = contador;
                System.out.println("\nInicio envio : " + _nContador);
                Object _o = new String(("New Message : " + contador));
                try
                {
                    _rabbitTemplate.convertAndSend(queueName, _o,
                            new MessagePostProcessor() {
                                @SuppressWarnings("deprecation")
                                @Override
                                public Message postProcessMessage(Message msg) throws AmqpException {
                                    if(_nContador%2 == 0) {
                                        System.out.println("\t--- msg.getMessageProperties().setCorrelationId ");
                                        msg.getMessageProperties().setCorrelationId("NewCorrelation".getBytes(StandardCharsets.UTF_8));
                                    }
                                    return msg;
                                }
                            }
                    );   
                    System.out.println("\tOK");
                }catch (Exception e) {
                    System.err.println("\t\tError en envio : " + contador + " - " + e.getMessage());
                }

                System.out.println("Fin envio : " + contador);
                Thread.sleep(500);
            }
        }catch (Exception e) {
            System.err.println("Exception : " + e.getMessage());
        }
    }
}

问题是,如果我更改 rabbitTemplate 的配置以使错误不会发生,它是否会对使用 Spring AMQP 或其他替代方案的客户端产生影响?

--- 编辑(2019 年 3 月 28 日) 这是带有代码的完整堆栈跟踪:

org.springframework.amqp.AmqpUnsupportedEncodingException: java.io.UnsupportedEncodingException: gzip
    at org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter.fromMessageProperties(DefaultMessagePropertiesConverter.java:211)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doSend(RabbitTemplate.java:1531)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doInRabbit(RabbitTemplate.java:716)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1455)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1411)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:712)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:813)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:791)
    at es.jab.example.SimpleProducerGZIP.main(SimpleProducerGZIP.java:79)
Caused by: java.io.UnsupportedEncodingException: gzip
    at java.lang.StringCoding.decode(Unknown Source)
    at java.lang.String.<init>(Unknown Source)
    at java.lang.String.<init>(Unknown Source)
    at org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter.fromMessageProperties(DefaultMessagePropertiesConverter.java:208)
    ... 8 more

我有兴趣查看完整的堆栈跟踪以获取有关该问题的更多信息。

此代码是从 byte[] 相关 ID 到 String 转换的一部分。这是避免 byte[]/String/byte[] 转换所必需的。

当策略为 String 时,您应该使用 correlationIdString 属性 而不是 correlationId。否则,correlationId 将不会映射到出站消息中(在这种情况下我们不会查看 correlationId)。对于入站消息,它控制填充哪个 属性。

在 2.0 及更高版本中,correlationId 现在是 String 而不是 byte[],因此不再需要此设置。

编辑

现在我看到了堆栈跟踪,这...

template.setEncoding("gzip");

...错了。

/**
 * The encoding to use when inter-converting between byte arrays and Strings in message properties.
 *
 * @param encoding the encoding to set
 */
public void setEncoding(String encoding) {
    this.encoding = encoding;
}

没有 gzip 这样的 Charset。这个属性与消息内容无关,只是在转换byte[]to/fromString时使用而已。默认为UTF-8