如何将 KStream 打印到控制台?
How to print KStream to console?
我创建了一个 Kafka 主题并向其推送了一条消息。
所以
bin/kafka-console-consumer --bootstrap-server abc.xyz.com:9092 --topic myTopic --from-beginning --property print.key=true --property key.separator="-"
打印
key1-customer1
在命令行上。
我想从这个主题创建一个 Kafka Stream,并想在控制台上打印这个 key1-customer1
。
我为它写了以下内容:
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-id");
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "client-id");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "abc.xyz.com:9092");
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Records should be flushed every 10 seconds. This is less than the default
// in order to keep this example interactive.
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
// For illustrative purposes we disable record caches
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> customerStream = builder.stream("myTopic");
customerStream.foreach(new ForeachAction<String, String>() {
public void apply(String key, String value) {
System.out.println(key + ": " + value);
}
});
final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
这不会失败。但是,这不会像 所建议的那样在控制台上打印任何内容。
我是卡夫卡的新手。因此,任何使这项工作成功的建议都会对我有很大帮助。
我会尝试取消设置 CLIENT_ID_CONFIG
,只保留 APPLICATION_ID_CONFIG
。卡夫卡流 uses the application ID to set the client ID.
我还将验证您的 Kafka Streams 应用程序正在使用的消费者组 ID 的偏移量(此消费者组 ID 也基于您的应用程序 ID)。使用 kafka-consumer-groups.sh
工具。可能是您的 Streams 应用程序领先于您为该主题生成的所有记录,这可能是因为自动偏移重置设置为最新,或者可能是由于其他一些不易从您的问题中辨别出来的原因。
TL;DR 使用 Printed.
import org.apache.kafka.streams.kstream.Printed
val sysout = Printed
.toSysOut[String, String]
.withLabel("customerStream")
customerStream.print(sysout)
我创建了一个 Kafka 主题并向其推送了一条消息。
所以
bin/kafka-console-consumer --bootstrap-server abc.xyz.com:9092 --topic myTopic --from-beginning --property print.key=true --property key.separator="-"
打印
key1-customer1
在命令行上。
我想从这个主题创建一个 Kafka Stream,并想在控制台上打印这个 key1-customer1
。
我为它写了以下内容:
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-id");
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "client-id");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "abc.xyz.com:9092");
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Records should be flushed every 10 seconds. This is less than the default
// in order to keep this example interactive.
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
// For illustrative purposes we disable record caches
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> customerStream = builder.stream("myTopic");
customerStream.foreach(new ForeachAction<String, String>() {
public void apply(String key, String value) {
System.out.println(key + ": " + value);
}
});
final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
这不会失败。但是,这不会像
我是卡夫卡的新手。因此,任何使这项工作成功的建议都会对我有很大帮助。
我会尝试取消设置 CLIENT_ID_CONFIG
,只保留 APPLICATION_ID_CONFIG
。卡夫卡流 uses the application ID to set the client ID.
我还将验证您的 Kafka Streams 应用程序正在使用的消费者组 ID 的偏移量(此消费者组 ID 也基于您的应用程序 ID)。使用 kafka-consumer-groups.sh
工具。可能是您的 Streams 应用程序领先于您为该主题生成的所有记录,这可能是因为自动偏移重置设置为最新,或者可能是由于其他一些不易从您的问题中辨别出来的原因。
TL;DR 使用 Printed.
import org.apache.kafka.streams.kstream.Printed
val sysout = Printed
.toSysOut[String, String]
.withLabel("customerStream")
customerStream.print(sysout)