Kafka:手动生成记录触发消费者异常
Kafka: Manually producing record triggers exception in consumer
我有一个从多个主题进行投票的消费者。
到目前为止,我只用 Java 制作了这些主题的记录,一切正常。
我使用 avro 的 confulent 工具。
现在我尝试通过终端手动生成主题。
我创建了一个 avro-producer,其模式与我的其他制作人使用的相同:
# Produce a record with one field
kafka-avro-console-producer \
--broker-list 127.0.0.1:9092 --topic order_created-in \
--property schema.registry.url=http://127.0.0.1:8081 \
--property value.schema='{"type":"record","name":"test","fields":[{"name":"name","type":"string"},{"name":"APropertie","type":{"type":"array","items":{"type":"record","name":"APropertie","fields":[{"name":"key","type":"string"},{"name":"name","type":"string"},{"name":"date","type":"string"}]}}}]}'
之后,我生成了一条遵循指定模式的记录:
{"name": "order_created", "APropertie": [{"key": "1", "name": "testname", "date": "testdate"}]}
记录已正确生成主题。 但是我的 AvroConsumer 抛出异常:
Polling
Polling
Polling
Polling
Polling
Polling
Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition order_created-in-0 at offset 1. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 61
Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class test specified in writer's schema whilst finding reader's schema for a SpecificRecord.
Process finished with exit code 1
有什么提示吗?
谢谢!
与生产者/消费者的配置有关。
普通生产者有这样的配置:
// normal producer
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
properties.setProperty("acks", "all");
properties.setProperty("retries", "10");
Avro 通常添加以下属性:
// avro part
properties.setProperty("key.serializer", StringSerializer.class.getName());
properties.setProperty("value.serializer", KafkaAvroSerializer.class.getName());
properties.setProperty("schema.registry.url", "http://127.0.0.1:8081");
properties.setProperty("confluent.value.schema.validation", "true");
properties.setProperty("confluent.key.schema.validation", "true");
这些必须包含在控制台制作者中。
我有一个从多个主题进行投票的消费者。 到目前为止,我只用 Java 制作了这些主题的记录,一切正常。
我使用 avro 的 confulent 工具。
现在我尝试通过终端手动生成主题。
我创建了一个 avro-producer,其模式与我的其他制作人使用的相同:
# Produce a record with one field
kafka-avro-console-producer \
--broker-list 127.0.0.1:9092 --topic order_created-in \
--property schema.registry.url=http://127.0.0.1:8081 \
--property value.schema='{"type":"record","name":"test","fields":[{"name":"name","type":"string"},{"name":"APropertie","type":{"type":"array","items":{"type":"record","name":"APropertie","fields":[{"name":"key","type":"string"},{"name":"name","type":"string"},{"name":"date","type":"string"}]}}}]}'
之后,我生成了一条遵循指定模式的记录:
{"name": "order_created", "APropertie": [{"key": "1", "name": "testname", "date": "testdate"}]}
记录已正确生成主题。 但是我的 AvroConsumer 抛出异常:
Polling
Polling
Polling
Polling
Polling
Polling
Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition order_created-in-0 at offset 1. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 61
Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class test specified in writer's schema whilst finding reader's schema for a SpecificRecord.
Process finished with exit code 1
有什么提示吗? 谢谢!
与生产者/消费者的配置有关。
普通生产者有这样的配置:
// normal producer
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
properties.setProperty("acks", "all");
properties.setProperty("retries", "10");
Avro 通常添加以下属性:
// avro part
properties.setProperty("key.serializer", StringSerializer.class.getName());
properties.setProperty("value.serializer", KafkaAvroSerializer.class.getName());
properties.setProperty("schema.registry.url", "http://127.0.0.1:8081");
properties.setProperty("confluent.value.schema.validation", "true");
properties.setProperty("confluent.key.schema.validation", "true");
这些必须包含在控制台制作者中。