将 Base64 JSON 字符串反序列化为 Protobuf
Deserialize Base64 JSON string into Protobuf
我有一个 Kafka-Consumer,它从 Kafka Broker 接收 JSON 消息。 JSON 消息中的键“payload”有一个 Base64 Google 协议缓冲区对象存储为它的值。即对象消息使用 Google 协议缓冲区序列化并使用 Base64 编码并由 Kakfa-Producer 发送到 Kafka-Broker。
我可以从 Kafka-Consumer 收到 JSON 消息,但我在解码和反序列化具有键“payload”的对象时遇到困难。起初我将接收到的消息更改为 JSON 对象,然后我从键“payload”中获取了值。然后我用 Base64 解码器对值进行解码以生成字节数组。然后生成的字节数组用于创建一个 ByteArrayInputStream,然后将其作为参数传递给 ObjectInputStream 构造函数。最后,我将此 ObjectInputStream 转换为所需的 Google Protocol Buffer 对象以进行反序列化。
我在读取
对象输入流对象。我在下面包含了 json 消息格式和代码:
// JSON 消息
{
“消息时间戳”:“2021-08-01T12:10:07Z”,
"tenantId":"test123",
“objectUuid”:“b1602572-156e-4476-96d8-283b2d23ecfa”,
“deviceUuid”:“a512a4f0-f582-48a3-9001-64487f6c5288”,
“activationId”:“test3258”,
“有效载荷”:“Ej4KJGIxNjAyNTcyLTE1NmUtNDQ3Ni05NmQ4LTI4M2IyZDIzZWNmYRIWChRSw7x0dGVscGxhdHRlIEJPTUFHIBoJCgcI6MOuhrovIj8KBwjow66Gui8SCQkhv5FTThVIQBoJCfJ0wfNlaDBAIgAoAjADOgIIB0IJCQAAAOCjcOU/SgkJAAAAANej8D8qAhoAOgIKAEIMCgQItbsBEgQIvYMHShYKEGRpc3RhbmNlRnJvbUxhc3QSAhgCSg8KCWlzRHJpdmluZxICIAA="
}
//Kafka消费者代码
package com.kafka.project;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.time.Duration;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.primitives.Bytes;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializerConfig;
public class consumer {
public static void main(String[] args) {
// TODO Auto-generated method stub
Logger logger= LoggerFactory.getLogger(consumer.class.getName());
String bootstrapServers="localhost:9092";
String grp_id="test-id";
String topic="test";
String username = "admin";
String password = "admin";
//Creating consumer properties
Properties properties=new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
properties.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,grp_id);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
//creating consumer
KafkaConsumer<String,String> consumer= new KafkaConsumer<String,String>(properties);
//NormalizedDeviceDataProtos.NormalizedDeviceData.getDefaultInstance().getPosition();
//Subscribing
consumer.subscribe(Arrays.asList(topic));
//polling
while(true){
ConsumerRecords<String,String> records=consumer.poll(Duration.ofMillis(100));
for(ConsumerRecord<String,String> record: records){
JSONObject jsonObj = new JSONObject(record.value());
byte bytes = Base64.getDecoder().decode(json.getString("payload"));
ObjectInputStream in;
try {
in = new ObjectInputStream(new ByteArrayInputStream(bytes));
try {
//ExampleProtocolBuffer.messageData is a java class generated by "protoc" compiler and "messageData" is a message name in ".proto" file
ExampleProtocolBuffer.messageData message = (ExampleProtocolBuffer.messageData) in.readObject();
logger.info("Decoded FirstName: " + message.getName.getFirstName());
in.close();
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
参考文档 - https://developers.google.com/protocol-buffers/docs/javatutorial#parsing-and-serialization
看看Parser#parseFrom(byte[] data)
,这是你的消息类型应该有的方法。
ObjectInputStream 用于 Java 序列化对象,而不是 Protobuf
我有一个 Kafka-Consumer,它从 Kafka Broker 接收 JSON 消息。 JSON 消息中的键“payload”有一个 Base64 Google 协议缓冲区对象存储为它的值。即对象消息使用 Google 协议缓冲区序列化并使用 Base64 编码并由 Kakfa-Producer 发送到 Kafka-Broker。
我可以从 Kafka-Consumer 收到 JSON 消息,但我在解码和反序列化具有键“payload”的对象时遇到困难。起初我将接收到的消息更改为 JSON 对象,然后我从键“payload”中获取了值。然后我用 Base64 解码器对值进行解码以生成字节数组。然后生成的字节数组用于创建一个 ByteArrayInputStream,然后将其作为参数传递给 ObjectInputStream 构造函数。最后,我将此 ObjectInputStream 转换为所需的 Google Protocol Buffer 对象以进行反序列化。
我在读取 对象输入流对象。我在下面包含了 json 消息格式和代码:
// JSON 消息 { “消息时间戳”:“2021-08-01T12:10:07Z”, "tenantId":"test123", “objectUuid”:“b1602572-156e-4476-96d8-283b2d23ecfa”, “deviceUuid”:“a512a4f0-f582-48a3-9001-64487f6c5288”, “activationId”:“test3258”, “有效载荷”:“Ej4KJGIxNjAyNTcyLTE1NmUtNDQ3Ni05NmQ4LTI4M2IyZDIzZWNmYRIWChRSw7x0dGVscGxhdHRlIEJPTUFHIBoJCgcI6MOuhrovIj8KBwjow66Gui8SCQkhv5FTThVIQBoJCfJ0wfNlaDBAIgAoAjADOgIIB0IJCQAAAOCjcOU/SgkJAAAAANej8D8qAhoAOgIKAEIMCgQItbsBEgQIvYMHShYKEGRpc3RhbmNlRnJvbUxhc3QSAhgCSg8KCWlzRHJpdmluZxICIAA=" }
//Kafka消费者代码
package com.kafka.project;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.time.Duration;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.primitives.Bytes;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializerConfig;
public class consumer {
public static void main(String[] args) {
// TODO Auto-generated method stub
Logger logger= LoggerFactory.getLogger(consumer.class.getName());
String bootstrapServers="localhost:9092";
String grp_id="test-id";
String topic="test";
String username = "admin";
String password = "admin";
//Creating consumer properties
Properties properties=new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
properties.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,grp_id);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
//creating consumer
KafkaConsumer<String,String> consumer= new KafkaConsumer<String,String>(properties);
//NormalizedDeviceDataProtos.NormalizedDeviceData.getDefaultInstance().getPosition();
//Subscribing
consumer.subscribe(Arrays.asList(topic));
//polling
while(true){
ConsumerRecords<String,String> records=consumer.poll(Duration.ofMillis(100));
for(ConsumerRecord<String,String> record: records){
JSONObject jsonObj = new JSONObject(record.value());
byte bytes = Base64.getDecoder().decode(json.getString("payload"));
ObjectInputStream in;
try {
in = new ObjectInputStream(new ByteArrayInputStream(bytes));
try {
//ExampleProtocolBuffer.messageData is a java class generated by "protoc" compiler and "messageData" is a message name in ".proto" file
ExampleProtocolBuffer.messageData message = (ExampleProtocolBuffer.messageData) in.readObject();
logger.info("Decoded FirstName: " + message.getName.getFirstName());
in.close();
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
参考文档 - https://developers.google.com/protocol-buffers/docs/javatutorial#parsing-and-serialization
看看Parser#parseFrom(byte[] data)
,这是你的消息类型应该有的方法。
ObjectInputStream 用于 Java 序列化对象,而不是 Protobuf