使用 Java 从 RabbitMQ 获取 Base64 编码的 GZipped 字符串

Get Base64 encoded GZipped String from RabbitMQ with Java

我已经实现了一些 GzipUtil,它工作得很好,看起来像这样:

import com.sun.org.apache.xml.internal.security.exceptions.Base64DecodingException;
import com.sun.org.apache.xml.internal.security.utils.Base64;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.zip.GZIPInputStream;

public class GzipUtil {
    public static void unzip(String putBase64EncodedGzippedStringHere) throws Base64DecodingException {
        byte[] compressed = Base64.decode(putBase64EncodedGzippedStringHere);

        if ((compressed == null) || (compressed.length == 0)) {
            throw new IllegalArgumentException("Cannot unzip null or empty bytes");
        }
        if (!isZipped(compressed)) {
            System.out.println(compressed);
        }

        try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(compressed)) {
            try (GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream)) {
                try (InputStreamReader inputStreamReader =
                             new InputStreamReader(gzipInputStream, StandardCharsets.UTF_8)) {
                    try (BufferedReader bufferedReader = new BufferedReader(inputStreamReader)) {
                        StringBuilder output = new StringBuilder();
                        String line;
                        while ((line = bufferedReader.readLine()) != null) {
                            output.append(line);
                            System.out.println(output.toString());
                        }
                    }
                }
            }
        } catch (IOException e) {
            throw new RuntimeException("Failed to unzip content", e);
        }
    }

    public static boolean isZipped(final byte[] compressed) {
        return (compressed[0] == (byte) (GZIPInputStream.GZIP_MAGIC))
                && (compressed[1] == (byte) (GZIPInputStream.GZIP_MAGIC >> 8));
    }
}

现在我有一些其他代码可以像这样使用 RabbitMQ 队列:

ConnectionFactory rabbitMqConnectionFactory = new ConnectionFactory();
rabbitMqConnectionFactory.setHost("MyHostname");
rabbitMqConnectionFactory.setPort(5672);
rabbitMqConnectionFactory.setUsername("MyUsername");
rabbitMqConnectionFactory.setPassword("MyPassword");
rabbitMqConnectionFactory.setVirtualHost("MyVirtualHost");

Connection physicalSocketConnectionToRabbitMq = rabbitMqConnectionFactory.newConnection();

Channel messageChannel = physicalSocketConnectionToRabbitMq.createChannel();
    
// if the queue already exists, it won't do anything, it just skips the operation
messageChannel.queueDeclare(
    "MyQueueName", //queue
    true, //durable
    false, //exclusive
    false, //autoDelete
    null //arguments
);


DeliverCallback deliverCallback = new DeliverCallback() {
    
    public void handle(String consumerTag, Delivery message) throws IOException {
        
        System.out.println("consumerTag=" + consumerTag);
        System.out.println("exchangeName=" +  message.getEnvelope().getExchange() );
        System.out.println("routingKey=" +  message.getEnvelope().getRoutingKey() );
        System.out.println("deliveryTag=" +  message.getEnvelope().getDeliveryTag() );

        byte[] data = message.getBody();
        
        if (data == null) {
            System.err.println("body is null");
        }
        else {
            System.err.println("body is not null: " + data);
            
            try {
                GzipUtil.unzip( new String(data) );
            }
            catch (Exception e) {
                System.err.println("Exception: " + e);
            }
        }
        
    }
};

messageChannel.basicConsume(
    "MyQueueName", 
    autoAcknowledge, 
    deliverCallback, 
    new CancelCallback() {
        public void handle(String consumerTag) throws IOException {
            //nothing to do
        }
    }
);

如果我运行这段代码,输出如下:

body is not null: [B@5f571234
consumerTag=XXX
exchangeName=XXX
routingKey=tomato_gzip_b64
deliveryTag=1

message.getBody() returns 一些字节数组。据我了解,我必须使用 new String 使其成为解码字符串:

byte[] decodedBytes = Base64.getDecoder().decode(encodedString);
System.out.println("decodedBytes=" + decodedBytes);
String decodedString = new String(decodedBytes);
System.out.println("decodedString=" + decodedString);

似乎我错过了什么,因为当我调用 GzipUtil.unzip( new String(data) ) 时,我的 GzipUtil returns 什么也没有;!有谁知道为什么?

String payload = new String(message.getBody(), StandardCharsets.UTF_8);

即使消息内容是 GZIP 存档中的 JSON 文件,有效负载仍被读取为字符串并使用 UTF-8 作为默认字符集。然后正确的内容在后面出来,可以进一步处理。

从这个字符串你可以再次制作一个 JSON 文件,也可以再次压缩。

来自 VGA 的好提示(上面的评论)!