Kafka Avro 序列化器和反序列化器异常。 Avro 支持的类型
Kafka Avro Serializer and deserializer exception. Avro supported types
我看到以下错误
exception Unsupported Avro type. Supported types are null, Boolean, Integer, Long, Float, Double, String, byte[] and IndexedRecord
我的 kafka producer 道具是
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.BOOTSTRAP_SERVERS);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1000);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
props.put("schema.registry.url", "http://localhost:8081");
props.put("value.converter.schema.registry.url", "http://localhost:8081");
props.put("producer.type", "sync");
props.put(ProducerConfig.CLIENT_ID_CONFIG, KafkaConstants.CLIENT_ID);
Producer<String, TweetInfoDto> producer = new KafkaProducer(props);
我的kafka消费道具是
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "twitterCrawler");
props.put(ConsumerConfig.CLIENT_ID_CONFIG, KafkaConstants.CLIENT_ID);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put("schema.registry.url", "http://localhost:8081");
props.put("value.converter.schema.registry.url", "http://localhost:8081");
Consumer<String, TweetInfoDto> consumer = new KafkaConsumer(props);
不确定我做错了什么。
TweetInfoDto
不能是您自己定义的普通 Java 对象。
例如,理想情况下,它应该通过 Avro Maven Plugin 从 Avro 模式创建。
请参阅 Schema Registry Tutorial 了解所有步骤,包括定义 AVSC 并为其生成 Java class。
补充 cricket_007 提到的内容,可以考虑使用
avro tools - Serializing and deserializing with code generation
我看到以下错误
exception Unsupported Avro type. Supported types are null, Boolean, Integer, Long, Float, Double, String, byte[] and IndexedRecord
我的 kafka producer 道具是
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.BOOTSTRAP_SERVERS);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1000);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
props.put("schema.registry.url", "http://localhost:8081");
props.put("value.converter.schema.registry.url", "http://localhost:8081");
props.put("producer.type", "sync");
props.put(ProducerConfig.CLIENT_ID_CONFIG, KafkaConstants.CLIENT_ID);
Producer<String, TweetInfoDto> producer = new KafkaProducer(props);
我的kafka消费道具是
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "twitterCrawler");
props.put(ConsumerConfig.CLIENT_ID_CONFIG, KafkaConstants.CLIENT_ID);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put("schema.registry.url", "http://localhost:8081");
props.put("value.converter.schema.registry.url", "http://localhost:8081");
Consumer<String, TweetInfoDto> consumer = new KafkaConsumer(props);
不确定我做错了什么。
TweetInfoDto
不能是您自己定义的普通 Java 对象。
例如,理想情况下,它应该通过 Avro Maven Plugin 从 Avro 模式创建。
请参阅 Schema Registry Tutorial 了解所有步骤,包括定义 AVSC 并为其生成 Java class。
补充 cricket_007 提到的内容,可以考虑使用 avro tools - Serializing and deserializing with code generation