Kafka Stream 给出奇怪的输出
Kafka Stream giving weird output
我正在尝试使用 Kafka Streams 进行基本聚合(出于这个问题的目的,每条消息只需递增 1)。在接收对 KTable
所做更改的输出主题上,我得到了非常奇怪的输出:
@B�
@C
@C�
@D
@D�
@E
@E�
@F
@F�
我知道“�”表示它正在打印字符集中不存在的某种字符,但我不确定为什么。这是我的代码供参考:
public class KafkaMetricsAggregator {
public static void main(final String[] args) throws Exception {
final String bootstrapServers = args.length > 0 ? args[0] : "my-kafka-ip:9092";
final Properties streamsConfig = new Properties();
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "metrics-aggregator");
// Where to find Kafka broker(s).
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// Specify default (de)serializers for record keys and for record values.
streamsConfig.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Records should be flushed every 10 seconds. This is less than the default
// in order to keep this example interactive.
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
// For illustrative purposes we disable record caches
streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
// Class to extract the timestamp from the event object
streamsConfig.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "my.package.EventTimestampExtractor");
// Set up serializers and deserializers, which we will use for overriding the default serdes
// specified above.
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(new JsonSerializer(), new JsonDeserializer());
final Serde<String> stringSerde = Serdes.String();
final Serde<Double> doubleSerde = Serdes.Double();
final KStreamBuilder builder = new KStreamBuilder();
final KTable<String, Double> aggregatedMetrics = builder.stream(jsonSerde, jsonSerde, "test2")
.groupBy(KafkaMetricsAggregator::generateKey, stringSerde, jsonSerde)
.aggregate(
() -> 0d,
(key, value, agg) -> agg + 1,
doubleSerde,
"metrics-table2");
aggregatedMetrics.to(stringSerde, doubleSerde, "metrics");
final KafkaStreams streams = new KafkaStreams(builder, streamsConfig);
// Only clean up in development
streams.cleanUp();
streams.start();
// Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
编辑:使用 aggregatedMetrics.print();
确实向控制台打印出正确的输出:
[KSTREAM-AGGREGATE-0000000002]: my-generated-key , (43.0<-null)
对正在发生的事情有什么想法吗?
您正在使用 Serdes.Double() 作为您的值,它使用二进制高效编码 [1] 作为序列化值,这就是您在主题中看到的内容。要在控制台上获得人类可读的数字,您需要指示消费者也使用 DoubleDeserializer。
在消费者的命令行中指定 DoubleDeserializer 作为值反序列化器,如下所示
--property value.deserializer=org.apache.kafka.common.serialization.DoubleDeserializer
我正在尝试使用 Kafka Streams 进行基本聚合(出于这个问题的目的,每条消息只需递增 1)。在接收对 KTable
所做更改的输出主题上,我得到了非常奇怪的输出:
@B�
@C
@C�
@D
@D�
@E
@E�
@F
@F�
我知道“�”表示它正在打印字符集中不存在的某种字符,但我不确定为什么。这是我的代码供参考:
public class KafkaMetricsAggregator {
public static void main(final String[] args) throws Exception {
final String bootstrapServers = args.length > 0 ? args[0] : "my-kafka-ip:9092";
final Properties streamsConfig = new Properties();
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "metrics-aggregator");
// Where to find Kafka broker(s).
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// Specify default (de)serializers for record keys and for record values.
streamsConfig.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Records should be flushed every 10 seconds. This is less than the default
// in order to keep this example interactive.
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
// For illustrative purposes we disable record caches
streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
// Class to extract the timestamp from the event object
streamsConfig.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "my.package.EventTimestampExtractor");
// Set up serializers and deserializers, which we will use for overriding the default serdes
// specified above.
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(new JsonSerializer(), new JsonDeserializer());
final Serde<String> stringSerde = Serdes.String();
final Serde<Double> doubleSerde = Serdes.Double();
final KStreamBuilder builder = new KStreamBuilder();
final KTable<String, Double> aggregatedMetrics = builder.stream(jsonSerde, jsonSerde, "test2")
.groupBy(KafkaMetricsAggregator::generateKey, stringSerde, jsonSerde)
.aggregate(
() -> 0d,
(key, value, agg) -> agg + 1,
doubleSerde,
"metrics-table2");
aggregatedMetrics.to(stringSerde, doubleSerde, "metrics");
final KafkaStreams streams = new KafkaStreams(builder, streamsConfig);
// Only clean up in development
streams.cleanUp();
streams.start();
// Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
编辑:使用 aggregatedMetrics.print();
确实向控制台打印出正确的输出:
[KSTREAM-AGGREGATE-0000000002]: my-generated-key , (43.0<-null)
对正在发生的事情有什么想法吗?
您正在使用 Serdes.Double() 作为您的值,它使用二进制高效编码 [1] 作为序列化值,这就是您在主题中看到的内容。要在控制台上获得人类可读的数字,您需要指示消费者也使用 DoubleDeserializer。
在消费者的命令行中指定 DoubleDeserializer 作为值反序列化器,如下所示
--property value.deserializer=org.apache.kafka.common.serialization.DoubleDeserializer