我们可以使用 CompletableFutures 进行并行 Kafka Streams 处理吗
Can we do parallel Kafka Streams processing with CompletableFutures
是否可以使用 Java CompletableFutures 在 Kafka 流应用程序中进行并行工作?
我想读取 1 个 Kafka 主题,创建两个窗口计数,1 个用于分钟,另一个用于小时,但它们是并行进行的。
我写了一些示例代码。我能够让它工作但查看 Kafka 流文档,因为 KafkaStreams 为每个分区分配 1 个任务并且它不能超过一个线程我不确定这段代码是否会产生预期的效果。
CompletableFuture completableFutureOfMinute = CompletableFuture.runAsync(() -> {
inputStream.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(Duration.ofMinutes(1)))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as(
"minute-store")
.withRetention(Duration.ofMinutes(1)))
.toStream()
.to("result-topic");
});
CompletableFuture completableFutureOfHour = CompletableFuture.runAsync(() -> {
inputStream.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(Duration.ofHours(1)))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as(
"hour-store")
.withRetention(Duration.ofHours(1)))
.toStream()
.to("result-topic-2", produced);
});
final CompletableFuture<Void> combinedFutures = CompletableFuture.allOf(completableFutureOfMinute,
completableFutureOfHour);
try {
combinedFutures.get();
} catch (final Exception ex) {
}
你的程序好像不对
请注意,使用 DSL,您基本上 assemble 一个数据流程序,数据处理仅在您调用 KafkaStreams#start()
时才开始。因此,在指定处理逻辑时使用 Futures
没有帮助,因为尚未处理任何数据。
Kafka Streams 基于任务并行化。因此,如果您想并行处理两个 windows,则需要“复制”输入主题以将您的程序(称为 Topology
)拆分为独立的部分(称为 SubTopology
):
KStream input = builder.stream(...);
input.groupByKey().windowBy(/* 1 min */).count(...);
input.repartition().groupByKey().windowBy(/* 1 hour */).count();
使用 repartition()
你的程序将被分成两个子拓扑,你将为每个子拓扑分配任务,这些任务可以由不同的线程并行处理。
但是,我真的怀疑这个程序是否会增加你的吞吐量。如果你真的想提高你的吞吐量,你应该增加输入主题分区的数量以获得更多的并行任务。
是否可以使用 Java CompletableFutures 在 Kafka 流应用程序中进行并行工作?
我想读取 1 个 Kafka 主题,创建两个窗口计数,1 个用于分钟,另一个用于小时,但它们是并行进行的。
我写了一些示例代码。我能够让它工作但查看 Kafka 流文档,因为 KafkaStreams 为每个分区分配 1 个任务并且它不能超过一个线程我不确定这段代码是否会产生预期的效果。
CompletableFuture completableFutureOfMinute = CompletableFuture.runAsync(() -> {
inputStream.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(Duration.ofMinutes(1)))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as(
"minute-store")
.withRetention(Duration.ofMinutes(1)))
.toStream()
.to("result-topic");
});
CompletableFuture completableFutureOfHour = CompletableFuture.runAsync(() -> {
inputStream.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(Duration.ofHours(1)))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as(
"hour-store")
.withRetention(Duration.ofHours(1)))
.toStream()
.to("result-topic-2", produced);
});
final CompletableFuture<Void> combinedFutures = CompletableFuture.allOf(completableFutureOfMinute,
completableFutureOfHour);
try {
combinedFutures.get();
} catch (final Exception ex) {
}
你的程序好像不对
请注意,使用 DSL,您基本上 assemble 一个数据流程序,数据处理仅在您调用 KafkaStreams#start()
时才开始。因此,在指定处理逻辑时使用 Futures
没有帮助,因为尚未处理任何数据。
Kafka Streams 基于任务并行化。因此,如果您想并行处理两个 windows,则需要“复制”输入主题以将您的程序(称为 Topology
)拆分为独立的部分(称为 SubTopology
):
KStream input = builder.stream(...);
input.groupByKey().windowBy(/* 1 min */).count(...);
input.repartition().groupByKey().windowBy(/* 1 hour */).count();
使用 repartition()
你的程序将被分成两个子拓扑,你将为每个子拓扑分配任务,这些任务可以由不同的线程并行处理。
但是,我真的怀疑这个程序是否会增加你的吞吐量。如果你真的想提高你的吞吐量,你应该增加输入主题分区的数量以获得更多的并行任务。