Kafka 消费者无法接收序列化对象?
Kafka consumer couldn't receive serialize Object?
所以我想实现一个简单的应用程序,它向 kafka 发送通知 kafka 生产者 consumer.So 到目前为止,当我尝试向 kafka 消费者发送通知对象时,我已经成功地将 String 消息发送给生产者 consumer.But '没有收到任何 objects.This 是我使用的代码。
public class Notification implements Serializable{
private String name;
private String message;
private long currentTimeStamp;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public long getCurrentTimeStamp() {
return currentTimeStamp;
}
public void setCurrentTimeStamp(long currentTimeStamp) {
this.currentTimeStamp = currentTimeStamp;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Notification that = (Notification) o;
if (currentTimeStamp != that.currentTimeStamp) return false;
if (message != null ? !message.equals(that.message) : that.message != null) return false;
if (name != null ? !name.equals(that.name) : that.name != null) return false;
return true;
}
@Override
public int hashCode() {
int result = name != null ? name.hashCode() : 0;
result = 31 * result + (message != null ? message.hashCode() : 0);
result = 31 * result + (int) (currentTimeStamp ^ (currentTimeStamp >>> 32));
return result;
}
@Override
public String toString() {
return "Notification{" +
"name='" + name + '\'' +
", message='" + message + '\'' +
", currentTimeStamp=" + currentTimeStamp +
'}';
}
}
这是制作人
public class KafkaProducer {
static String topic = "kafka-tutorial";
public static void main(String[] args) {
System.out.println("Start Kafka producer");
Properties properties = new Properties();
properties.put("metadata.broker.list", "localhost:9092");
properties.put("serializer.class", "dev.innova.kafka.tutorial.producer.CustomSerializer");
ProducerConfig producerConfig = new ProducerConfig(properties);
kafka.javaapi.producer.Producer<String, Notification> producer = new kafka.javaapi.producer.Producer<String, Notification>(producerConfig);
KeyedMessage<String, Notification> message = new KeyedMessage<String, Notification>(topic, createNotification());
System.out.println("send Message to broker");
producer.send(message);
producer.close();
}
private static Notification createNotification(){
Notification notification = new Notification();
notification.setMessage("Sample Message");
notification.setName("Sajith");
notification.setCurrentTimeStamp(System.currentTimeMillis());
return notification;
}
}
这是消费者
public class KafkaConcumer extends Thread {
final static String clientId = "SimpleConsumerDemoClient";
final static String TOPIC = "kafka-tutorial";
ConsumerConnector consumerConnector;
public KafkaConcumer() {
Properties properties = new Properties();
properties.put("zookeeper.connect","localhost:2181");
properties.put("group.id","test-group");
properties.put("serializer.class", "dev.innova.kafka.tutorial.producer.CustomSerializer");
properties.put("zookeeper.session.timeout.ms", "400");
properties.put("zookeeper.sync.time.ms", "200");
properties.put("auto.commit.interval.ms", "1000");
ConsumerConfig consumerConfig = new ConsumerConfig(properties);
consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
}
@Override
public void run() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(TOPIC, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(TOPIC).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
System.out.println("It :" + it.size());
while(it.hasNext()){
System.out.println(new String(it.next().message()));
}
}
private static void printMessages(ByteBufferMessageSet messageSet) throws UnsupportedEncodingException {
for(MessageAndOffset messageAndOffset: messageSet) {
ByteBuffer payload = messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
System.out.println(new String(bytes, "UTF-8"));
}
}
}
最后我使用了 customserializer 来序列化和反序列化对象。
public class CustomSerializer implements Encoder<Notification>, Decoder<Notification> {
public CustomSerializer(VerifiableProperties verifiableProperties) {
/* This constructor must be present for successful compile. */
}
@Override
public byte[] toBytes(Notification o) {
return new byte[0];
}
@Override
public Notification fromBytes(byte[] bytes) {
return null;
}
}
谁能告诉我这是什么问题?这是正确的方法吗?
你有两个问题。
首先,你的解串器没有任何逻辑。它 returns 它序列化的每个对象的空字节数组和 returns 一个空对象,每当它被要求反序列化一个对象时。您需要将实际序列化和反序列化对象的代码放在那里。
其次,如果您计划使用来自 JVM 的本机 JVM 序列化和反序列化逻辑,则需要将 serialVersionUID 添加到将要传输的 bean。像这样:
private static final long serialVersionUID = 123L;
您可以使用任何您喜欢的值。当对象被 JVM 反序列化时,对象中的 serialVersionId 将与加载的 class 定义中指定的值进行比较。如果两者不同,那么 JVM 会假设即使您加载了 class 定义,您也没有加载正确版本的 class 定义,序列化将失败。如果您没有在 class 定义中指定 serialVersionID 的值,那么 JVM 将为您弥补一个,并且两个不同的 JVM(一个用于生产者,一个用于消费者)几乎肯定会弥补不同为您创造价值。
编辑
如果你想利用默认的 Java 序列化,你需要让你的序列化器看起来像这样:
public class CustomSerializer implements Encoder<Notification>, Decoder<Notification> {
public CustomSerializer(VerifiableProperties verifiableProperties) {
/* This constructor must be present for successful compile. */
}
@Override
public byte[] toBytes(Notification o) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(o);
oos.close();
byte[] b = baos.toByteArray();
return b;
} catch (IOException e) {
return new byte[0];
}
}
@Override
public Notification fromBytes(byte[] bytes) {
try {
return (Notification) new ObjectInputStream(new ByteArrayInputStream(b)).readObject();
} catch (Exception e) {
return null;
}
}
创建一个自定义反序列化器,Kafka需要一种序列化和反序列化的方法。目前我们必须提供这两种实现
需要添加库来获取对象映射器 class
FasterXML jackson – 2.8.6
示例 - 序列化器
public class PayloadSerializer implements org.apache.kafka.common.serialization.Serializer {
@Override
public byte[] serialize(String arg0, Object arg1) {
byte[] retVal = null;
ObjectMapper objectMapper = new ObjectMapper();
TestModel model =(TestModel) arg1;
try {
retVal = objectMapper.writeValueAsString(model).getBytes();
} catch (Exception e) {
e.printStackTrace();
}
return retVal;
}
@Override
public void close() {
}
@Override
public void configure(Map map, boolean bln) {
}
}
解串器
public class PayloadDeserializer implements Deserializer {
@Override
public void close() {
}
@Override
public TestModel deserialize(String arg0, byte[] arg1) {
ObjectMapper mapper = new ObjectMapper();
TestModel testModel = null;
try {
testModel = mapper.readValue(arg1, TestModel.class);
} catch (Exception e) {
e.printStackTrace();
}
return testModel;
}
@Override
public void configure(Map map, boolean bln) {
}
}
最后我们必须将解串器 class 传递给接收器
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG - PayloadDeserializer.class
或
deserializer.class - classpath.PayloadDeserializer
我强烈建议您在发送之前将您的对象转换为 Avro 对象。
没那么难,是Kafka传输对象的方式。
所以我想实现一个简单的应用程序,它向 kafka 发送通知 kafka 生产者 consumer.So 到目前为止,当我尝试向 kafka 消费者发送通知对象时,我已经成功地将 String 消息发送给生产者 consumer.But '没有收到任何 objects.This 是我使用的代码。
public class Notification implements Serializable{
private String name;
private String message;
private long currentTimeStamp;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public long getCurrentTimeStamp() {
return currentTimeStamp;
}
public void setCurrentTimeStamp(long currentTimeStamp) {
this.currentTimeStamp = currentTimeStamp;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Notification that = (Notification) o;
if (currentTimeStamp != that.currentTimeStamp) return false;
if (message != null ? !message.equals(that.message) : that.message != null) return false;
if (name != null ? !name.equals(that.name) : that.name != null) return false;
return true;
}
@Override
public int hashCode() {
int result = name != null ? name.hashCode() : 0;
result = 31 * result + (message != null ? message.hashCode() : 0);
result = 31 * result + (int) (currentTimeStamp ^ (currentTimeStamp >>> 32));
return result;
}
@Override
public String toString() {
return "Notification{" +
"name='" + name + '\'' +
", message='" + message + '\'' +
", currentTimeStamp=" + currentTimeStamp +
'}';
}
}
这是制作人
public class KafkaProducer {
static String topic = "kafka-tutorial";
public static void main(String[] args) {
System.out.println("Start Kafka producer");
Properties properties = new Properties();
properties.put("metadata.broker.list", "localhost:9092");
properties.put("serializer.class", "dev.innova.kafka.tutorial.producer.CustomSerializer");
ProducerConfig producerConfig = new ProducerConfig(properties);
kafka.javaapi.producer.Producer<String, Notification> producer = new kafka.javaapi.producer.Producer<String, Notification>(producerConfig);
KeyedMessage<String, Notification> message = new KeyedMessage<String, Notification>(topic, createNotification());
System.out.println("send Message to broker");
producer.send(message);
producer.close();
}
private static Notification createNotification(){
Notification notification = new Notification();
notification.setMessage("Sample Message");
notification.setName("Sajith");
notification.setCurrentTimeStamp(System.currentTimeMillis());
return notification;
}
}
这是消费者
public class KafkaConcumer extends Thread {
final static String clientId = "SimpleConsumerDemoClient";
final static String TOPIC = "kafka-tutorial";
ConsumerConnector consumerConnector;
public KafkaConcumer() {
Properties properties = new Properties();
properties.put("zookeeper.connect","localhost:2181");
properties.put("group.id","test-group");
properties.put("serializer.class", "dev.innova.kafka.tutorial.producer.CustomSerializer");
properties.put("zookeeper.session.timeout.ms", "400");
properties.put("zookeeper.sync.time.ms", "200");
properties.put("auto.commit.interval.ms", "1000");
ConsumerConfig consumerConfig = new ConsumerConfig(properties);
consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
}
@Override
public void run() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(TOPIC, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(TOPIC).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
System.out.println("It :" + it.size());
while(it.hasNext()){
System.out.println(new String(it.next().message()));
}
}
private static void printMessages(ByteBufferMessageSet messageSet) throws UnsupportedEncodingException {
for(MessageAndOffset messageAndOffset: messageSet) {
ByteBuffer payload = messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
System.out.println(new String(bytes, "UTF-8"));
}
}
}
最后我使用了 customserializer 来序列化和反序列化对象。
public class CustomSerializer implements Encoder<Notification>, Decoder<Notification> {
public CustomSerializer(VerifiableProperties verifiableProperties) {
/* This constructor must be present for successful compile. */
}
@Override
public byte[] toBytes(Notification o) {
return new byte[0];
}
@Override
public Notification fromBytes(byte[] bytes) {
return null;
}
}
谁能告诉我这是什么问题?这是正确的方法吗?
你有两个问题。
首先,你的解串器没有任何逻辑。它 returns 它序列化的每个对象的空字节数组和 returns 一个空对象,每当它被要求反序列化一个对象时。您需要将实际序列化和反序列化对象的代码放在那里。
其次,如果您计划使用来自 JVM 的本机 JVM 序列化和反序列化逻辑,则需要将 serialVersionUID 添加到将要传输的 bean。像这样:
private static final long serialVersionUID = 123L;
您可以使用任何您喜欢的值。当对象被 JVM 反序列化时,对象中的 serialVersionId 将与加载的 class 定义中指定的值进行比较。如果两者不同,那么 JVM 会假设即使您加载了 class 定义,您也没有加载正确版本的 class 定义,序列化将失败。如果您没有在 class 定义中指定 serialVersionID 的值,那么 JVM 将为您弥补一个,并且两个不同的 JVM(一个用于生产者,一个用于消费者)几乎肯定会弥补不同为您创造价值。
编辑
如果你想利用默认的 Java 序列化,你需要让你的序列化器看起来像这样:
public class CustomSerializer implements Encoder<Notification>, Decoder<Notification> {
public CustomSerializer(VerifiableProperties verifiableProperties) {
/* This constructor must be present for successful compile. */
}
@Override
public byte[] toBytes(Notification o) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(o);
oos.close();
byte[] b = baos.toByteArray();
return b;
} catch (IOException e) {
return new byte[0];
}
}
@Override
public Notification fromBytes(byte[] bytes) {
try {
return (Notification) new ObjectInputStream(new ByteArrayInputStream(b)).readObject();
} catch (Exception e) {
return null;
}
}
创建一个自定义反序列化器,Kafka需要一种序列化和反序列化的方法。目前我们必须提供这两种实现 需要添加库来获取对象映射器 class
FasterXML jackson – 2.8.6
示例 - 序列化器
public class PayloadSerializer implements org.apache.kafka.common.serialization.Serializer {
@Override
public byte[] serialize(String arg0, Object arg1) {
byte[] retVal = null;
ObjectMapper objectMapper = new ObjectMapper();
TestModel model =(TestModel) arg1;
try {
retVal = objectMapper.writeValueAsString(model).getBytes();
} catch (Exception e) {
e.printStackTrace();
}
return retVal;
}
@Override
public void close() {
}
@Override
public void configure(Map map, boolean bln) {
}
}
解串器
public class PayloadDeserializer implements Deserializer {
@Override
public void close() {
}
@Override
public TestModel deserialize(String arg0, byte[] arg1) {
ObjectMapper mapper = new ObjectMapper();
TestModel testModel = null;
try {
testModel = mapper.readValue(arg1, TestModel.class);
} catch (Exception e) {
e.printStackTrace();
}
return testModel;
}
@Override
public void configure(Map map, boolean bln) {
}
}
最后我们必须将解串器 class 传递给接收器
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG - PayloadDeserializer.class
或
deserializer.class - classpath.PayloadDeserializer
我强烈建议您在发送之前将您的对象转换为 Avro 对象。
没那么难,是Kafka传输对象的方式。