Kafka 流不打印 avro 格式的值
Kafka stream not printing the value in the avro format
我的 Kafka 生产者正在以 avro 格式发送数据,我正在使用 avro-schema(SpinsAvro) 读取它并尝试将流打印到控制台。
但输出是 below:Key 为空,但值是一些垃圾。
[KSTREAM-SOURCE-0000000000]: null , [B@200868f5
[KSTREAM-SOURCE-0000000000]: null , [B@841bc92
[KSTREAM-SOURCE-0000000000]: null , [B@302e9607
[KSTREAM-SOURCE-0000000000]: null , [B@6f9139fb
[KSTREAM-SOURCE-0000000000]: null , [B@cbdab3c
代码如下:
public class TestStream {
public static void main(final String[] args) throws Exception {
final String bootstrapServers = "kafka-XXX:9092";
final String schemaRegistryUrl = "http://XXX:8081";
final KafkaStreams streams = buildStream(
bootstrapServers,
schemaRegistryUrl,
"/tmp/kafka-streams");
streams.cleanUp();
streams.start();
// Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
static KafkaStreams buildStream(final String bootstrapServers,
final String schemaRegistryUrl,
final String stateDir) {
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "lambda-example");
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "lambda-example-client");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
streamsConfiguration.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
streamsConfiguration.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
final KStreamBuilder builder = new KStreamBuilder();
// read the source stream
final KStream<String, SpinsAvro> feeds = builder.stream("spins_topic");
feeds.print();
return new KafkaStreams(builder, streamsConfiguration);
}
}
您的配置不正确。
Kafka Streams 使用 Serdes
将单一类型的序列化器和反序列化器组合在一个 class 中。设置
streamsConfiguration.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
streamsConfiguration.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
没有任何影响。您需要使用 DEFAULT_KEY_SERDE_CLASS_CONFIG
和 DEFAULT_VALUE_SERDE_CLASS_CONFIG
并传入适当的 Serdes
.
基本类型的 Serdes 已经通过 Serdes
class(例如,Serdes.Long()
)提供。还有 class WrapperSerde
可用于插入自定义序列化器和反序列化器以创建自定义 Serde
(或者您只是从头开始实现 Serde
接口)。
查看文档了解更多详情:https://kafka.apache.org/10/documentation/streams/developer-guide/datatypes.html
我的 Kafka 生产者正在以 avro 格式发送数据,我正在使用 avro-schema(SpinsAvro) 读取它并尝试将流打印到控制台。
但输出是 below:Key 为空,但值是一些垃圾。
[KSTREAM-SOURCE-0000000000]: null , [B@200868f5
[KSTREAM-SOURCE-0000000000]: null , [B@841bc92
[KSTREAM-SOURCE-0000000000]: null , [B@302e9607
[KSTREAM-SOURCE-0000000000]: null , [B@6f9139fb
[KSTREAM-SOURCE-0000000000]: null , [B@cbdab3c
代码如下:
public class TestStream {
public static void main(final String[] args) throws Exception {
final String bootstrapServers = "kafka-XXX:9092";
final String schemaRegistryUrl = "http://XXX:8081";
final KafkaStreams streams = buildStream(
bootstrapServers,
schemaRegistryUrl,
"/tmp/kafka-streams");
streams.cleanUp();
streams.start();
// Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
static KafkaStreams buildStream(final String bootstrapServers,
final String schemaRegistryUrl,
final String stateDir) {
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "lambda-example");
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "lambda-example-client");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
streamsConfiguration.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
streamsConfiguration.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
final KStreamBuilder builder = new KStreamBuilder();
// read the source stream
final KStream<String, SpinsAvro> feeds = builder.stream("spins_topic");
feeds.print();
return new KafkaStreams(builder, streamsConfiguration);
}
}
您的配置不正确。
Kafka Streams 使用 Serdes
将单一类型的序列化器和反序列化器组合在一个 class 中。设置
streamsConfiguration.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
streamsConfiguration.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
没有任何影响。您需要使用 DEFAULT_KEY_SERDE_CLASS_CONFIG
和 DEFAULT_VALUE_SERDE_CLASS_CONFIG
并传入适当的 Serdes
.
基本类型的 Serdes 已经通过 Serdes
class(例如,Serdes.Long()
)提供。还有 class WrapperSerde
可用于插入自定义序列化器和反序列化器以创建自定义 Serde
(或者您只是从头开始实现 Serde
接口)。
查看文档了解更多详情:https://kafka.apache.org/10/documentation/streams/developer-guide/datatypes.html