使用 Java 从 RabbitMQ 获取 Base64 编码的 GZipped 字符串
Get Base64 encoded GZipped String from RabbitMQ with Java
我已经实现了一些 GzipUtil,它工作得很好,看起来像这样:
import com.sun.org.apache.xml.internal.security.exceptions.Base64DecodingException;
import com.sun.org.apache.xml.internal.security.utils.Base64;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.zip.GZIPInputStream;
public class GzipUtil {
public static void unzip(String putBase64EncodedGzippedStringHere) throws Base64DecodingException {
byte[] compressed = Base64.decode(putBase64EncodedGzippedStringHere);
if ((compressed == null) || (compressed.length == 0)) {
throw new IllegalArgumentException("Cannot unzip null or empty bytes");
}
if (!isZipped(compressed)) {
System.out.println(compressed);
}
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(compressed)) {
try (GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream)) {
try (InputStreamReader inputStreamReader =
new InputStreamReader(gzipInputStream, StandardCharsets.UTF_8)) {
try (BufferedReader bufferedReader = new BufferedReader(inputStreamReader)) {
StringBuilder output = new StringBuilder();
String line;
while ((line = bufferedReader.readLine()) != null) {
output.append(line);
System.out.println(output.toString());
}
}
}
}
} catch (IOException e) {
throw new RuntimeException("Failed to unzip content", e);
}
}
public static boolean isZipped(final byte[] compressed) {
return (compressed[0] == (byte) (GZIPInputStream.GZIP_MAGIC))
&& (compressed[1] == (byte) (GZIPInputStream.GZIP_MAGIC >> 8));
}
}
现在我有一些其他代码可以像这样使用 RabbitMQ 队列:
ConnectionFactory rabbitMqConnectionFactory = new ConnectionFactory();
rabbitMqConnectionFactory.setHost("MyHostname");
rabbitMqConnectionFactory.setPort(5672);
rabbitMqConnectionFactory.setUsername("MyUsername");
rabbitMqConnectionFactory.setPassword("MyPassword");
rabbitMqConnectionFactory.setVirtualHost("MyVirtualHost");
Connection physicalSocketConnectionToRabbitMq = rabbitMqConnectionFactory.newConnection();
Channel messageChannel = physicalSocketConnectionToRabbitMq.createChannel();
// if the queue already exists, it won't do anything, it just skips the operation
messageChannel.queueDeclare(
"MyQueueName", //queue
true, //durable
false, //exclusive
false, //autoDelete
null //arguments
);
DeliverCallback deliverCallback = new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("consumerTag=" + consumerTag);
System.out.println("exchangeName=" + message.getEnvelope().getExchange() );
System.out.println("routingKey=" + message.getEnvelope().getRoutingKey() );
System.out.println("deliveryTag=" + message.getEnvelope().getDeliveryTag() );
byte[] data = message.getBody();
if (data == null) {
System.err.println("body is null");
}
else {
System.err.println("body is not null: " + data);
try {
GzipUtil.unzip( new String(data) );
}
catch (Exception e) {
System.err.println("Exception: " + e);
}
}
}
};
messageChannel.basicConsume(
"MyQueueName",
autoAcknowledge,
deliverCallback,
new CancelCallback() {
public void handle(String consumerTag) throws IOException {
//nothing to do
}
}
);
如果我运行这段代码,输出如下:
body is not null: [B@5f571234
consumerTag=XXX
exchangeName=XXX
routingKey=tomato_gzip_b64
deliveryTag=1
message.getBody() returns 一些字节数组。据我了解,我必须使用 new String 使其成为解码字符串:
byte[] decodedBytes = Base64.getDecoder().decode(encodedString);
System.out.println("decodedBytes=" + decodedBytes);
String decodedString = new String(decodedBytes);
System.out.println("decodedString=" + decodedString);
似乎我错过了什么,因为当我调用 GzipUtil.unzip( new String(data) ) 时,我的 GzipUtil returns 什么也没有;!有谁知道为什么?
String payload = new String(message.getBody(), StandardCharsets.UTF_8);
即使消息内容是 GZIP 存档中的 JSON 文件,有效负载仍被读取为字符串并使用 UTF-8 作为默认字符集。然后正确的内容在后面出来,可以进一步处理。
从这个字符串你可以再次制作一个 JSON 文件,也可以再次压缩。
来自 VGA 的好提示(上面的评论)!
我已经实现了一些 GzipUtil,它工作得很好,看起来像这样:
import com.sun.org.apache.xml.internal.security.exceptions.Base64DecodingException;
import com.sun.org.apache.xml.internal.security.utils.Base64;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.zip.GZIPInputStream;
public class GzipUtil {
public static void unzip(String putBase64EncodedGzippedStringHere) throws Base64DecodingException {
byte[] compressed = Base64.decode(putBase64EncodedGzippedStringHere);
if ((compressed == null) || (compressed.length == 0)) {
throw new IllegalArgumentException("Cannot unzip null or empty bytes");
}
if (!isZipped(compressed)) {
System.out.println(compressed);
}
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(compressed)) {
try (GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream)) {
try (InputStreamReader inputStreamReader =
new InputStreamReader(gzipInputStream, StandardCharsets.UTF_8)) {
try (BufferedReader bufferedReader = new BufferedReader(inputStreamReader)) {
StringBuilder output = new StringBuilder();
String line;
while ((line = bufferedReader.readLine()) != null) {
output.append(line);
System.out.println(output.toString());
}
}
}
}
} catch (IOException e) {
throw new RuntimeException("Failed to unzip content", e);
}
}
public static boolean isZipped(final byte[] compressed) {
return (compressed[0] == (byte) (GZIPInputStream.GZIP_MAGIC))
&& (compressed[1] == (byte) (GZIPInputStream.GZIP_MAGIC >> 8));
}
}
现在我有一些其他代码可以像这样使用 RabbitMQ 队列:
ConnectionFactory rabbitMqConnectionFactory = new ConnectionFactory();
rabbitMqConnectionFactory.setHost("MyHostname");
rabbitMqConnectionFactory.setPort(5672);
rabbitMqConnectionFactory.setUsername("MyUsername");
rabbitMqConnectionFactory.setPassword("MyPassword");
rabbitMqConnectionFactory.setVirtualHost("MyVirtualHost");
Connection physicalSocketConnectionToRabbitMq = rabbitMqConnectionFactory.newConnection();
Channel messageChannel = physicalSocketConnectionToRabbitMq.createChannel();
// if the queue already exists, it won't do anything, it just skips the operation
messageChannel.queueDeclare(
"MyQueueName", //queue
true, //durable
false, //exclusive
false, //autoDelete
null //arguments
);
DeliverCallback deliverCallback = new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("consumerTag=" + consumerTag);
System.out.println("exchangeName=" + message.getEnvelope().getExchange() );
System.out.println("routingKey=" + message.getEnvelope().getRoutingKey() );
System.out.println("deliveryTag=" + message.getEnvelope().getDeliveryTag() );
byte[] data = message.getBody();
if (data == null) {
System.err.println("body is null");
}
else {
System.err.println("body is not null: " + data);
try {
GzipUtil.unzip( new String(data) );
}
catch (Exception e) {
System.err.println("Exception: " + e);
}
}
}
};
messageChannel.basicConsume(
"MyQueueName",
autoAcknowledge,
deliverCallback,
new CancelCallback() {
public void handle(String consumerTag) throws IOException {
//nothing to do
}
}
);
如果我运行这段代码,输出如下:
body is not null: [B@5f571234
consumerTag=XXX
exchangeName=XXX
routingKey=tomato_gzip_b64
deliveryTag=1
message.getBody() returns 一些字节数组。据我了解,我必须使用 new String 使其成为解码字符串:
byte[] decodedBytes = Base64.getDecoder().decode(encodedString);
System.out.println("decodedBytes=" + decodedBytes);
String decodedString = new String(decodedBytes);
System.out.println("decodedString=" + decodedString);
似乎我错过了什么,因为当我调用 GzipUtil.unzip( new String(data) ) 时,我的 GzipUtil returns 什么也没有;!有谁知道为什么?
String payload = new String(message.getBody(), StandardCharsets.UTF_8);
即使消息内容是 GZIP 存档中的 JSON 文件,有效负载仍被读取为字符串并使用 UTF-8 作为默认字符集。然后正确的内容在后面出来,可以进一步处理。
从这个字符串你可以再次制作一个 JSON 文件,也可以再次压缩。
来自 VGA 的好提示(上面的评论)!