如何使用 KafkaStream 消费来自主题的记录 ..?
How to consume records from topic using KafkaStream..?
How KafkaStream consume messages from topic.
以下是我的代码:
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER_URL + ":" + KAFKA_SERVER_PORT);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KafkaStreams streams = new KafkaStreams(builder.build(), props);
builder.stream(topic_name).print(null);
streams.start();
The Kafka Streams DSL (Domain Specific Language) is built on top of the Streams Processor API.
它使用底层处理器 APIs 和下面的实现来读取来自 kafka 主题的消息。下面是详细架构:
https://kafka.apache.org/20/documentation/streams/architecture
Streams DSL 建立在处理器 API 之上。如果你深入研究处理器 API,你可以看到这些功能是如何实现的,并且可以用一行代码轻松调用:
https://kafka.apache.org/20/documentation/streams/developer-guide/processor-api.html
这就是 Stream DSL 操作的工作原理。在编写 KStream 应用程序时,使用 Streams DSL,大多数操作可以在几行代码中调用,但在它下面有完整的实现
最初每个操作都被转换为ProcessorNode。所以从主题读取转换为SourceNode,写入主题是SinkNode。
并将所有节点依次添加到 Topology 中。
您可以在 StreamsBuilder 和 StreamTask 的源代码中查看更多详细信息。它会让您了解如何构建拓扑以及 运行:
下面是 Wordcount 的 KStream 应用程序示例。假设 "wordcount-input" 是输入主题,"wordcount-output" 是输出主题:
final String bootstrapServers = args.length > 0 ? args[0] : "localhost:9092";
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-lambda-example");
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "wordcount-lambda-example-client");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // add if you want to reset the offset to earliest for each run
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> textLines = builder.stream("wordcount-input");
final Pattern pattern = Pattern.compile("\W+", Pattern.UNICODE_CHARACTER_CLASS);
final KTable<String, Long> wordCounts = textLines
.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
.groupBy((key, word) -> word)
.count();
// Write the `KTable<String, Long>` to the output topic.
wordCounts.toStream().to("wordcount-output", Produced.with(stringSerde, longSerde));
final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.cleanUp();
streams.start();
How KafkaStream consume messages from topic.
以下是我的代码:
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER_URL + ":" + KAFKA_SERVER_PORT);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KafkaStreams streams = new KafkaStreams(builder.build(), props);
builder.stream(topic_name).print(null);
streams.start();
The Kafka Streams DSL (Domain Specific Language) is built on top of the Streams Processor API.
它使用底层处理器 APIs 和下面的实现来读取来自 kafka 主题的消息。下面是详细架构:
https://kafka.apache.org/20/documentation/streams/architecture
Streams DSL 建立在处理器 API 之上。如果你深入研究处理器 API,你可以看到这些功能是如何实现的,并且可以用一行代码轻松调用:
https://kafka.apache.org/20/documentation/streams/developer-guide/processor-api.html
这就是 Stream DSL 操作的工作原理。在编写 KStream 应用程序时,使用 Streams DSL,大多数操作可以在几行代码中调用,但在它下面有完整的实现
最初每个操作都被转换为ProcessorNode。所以从主题读取转换为SourceNode,写入主题是SinkNode。
并将所有节点依次添加到 Topology 中。
您可以在 StreamsBuilder 和 StreamTask 的源代码中查看更多详细信息。它会让您了解如何构建拓扑以及 运行:
下面是 Wordcount 的 KStream 应用程序示例。假设 "wordcount-input" 是输入主题,"wordcount-output" 是输出主题:
final String bootstrapServers = args.length > 0 ? args[0] : "localhost:9092";
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-lambda-example");
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "wordcount-lambda-example-client");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // add if you want to reset the offset to earliest for each run
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> textLines = builder.stream("wordcount-input");
final Pattern pattern = Pattern.compile("\W+", Pattern.UNICODE_CHARACTER_CLASS);
final KTable<String, Long> wordCounts = textLines
.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
.groupBy((key, word) -> word)
.count();
// Write the `KTable<String, Long>` to the output topic.
wordCounts.toStream().to("wordcount-output", Produced.with(stringSerde, longSerde));
final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.cleanUp();
streams.start();