根据 Consumer.committablePartitionedSource 中分配的分区数调整平行度

Adjusting parallism based on number of partitions assigned in Consumer.committablePartitionedSource

我正在尝试使用 Consumer.committablePartitionedSource() 并为每个分区创建流,如下所示

    public void setup() {
        control = Consumer.committablePartitionedSource(consumerSettings,
                Subscriptions.topics("chat").withPartitionAssignmentHandler(new PartitionAssignmentListener()))
                .mapAsyncUnordered(Integer.MAX_VALUE, pair -> setupSource(pair, committerSettings))
                .toMat(Sink.ignore(), Consumer::createDrainingControl)
                .run(Materializer.matFromSystem(actorSystem));
    }

    private CompletionStage<Done> setupSource(Pair<TopicPartition, Source<ConsumerMessage.CommittableMessage<String, String>, NotUsed>> pair, CommitterSettings committerSettings) {
        LOGGER.info("SETTING UP PARTITION-{} SOURCE", pair.first().partition());
        return pair.second().mapAsync(16, msg -> CompletableFuture.supplyAsync(() -> consumeMessage(msg), actorSystem.dispatcher())
                .thenApply(param -> msg.committableOffset()))
                .withAttributes(ActorAttributes.supervisionStrategy(ex -> Supervision.restart()))
                .runWith(Committer.sink(committerSettings), Materializer.matFromSystem(actorSystem));
    }

在为每个分区设置源时,我正在使用并行性,我想根据分配给节点的分区数来更改它。我可以在第一次将分区分配给节点时做到这一点。但是随着新节点加入集群,分配的分区将被撤销和分配。这次流不发出已经存在的分区(由于 kafka 协作再平衡协议)来重新配置并行性。

在这里,我在所有源之间共享同一个调度程序,如果我在重新平衡时保持相同的并行性,我觉得每个分区消息处理的公平机会是不可能的。我对么?请指正

如果我对你的理解正确,你希望在 Kafka 重新平衡主题分区时动态变化的 Sources 数量上有一个固定的并行性。

查看 Alpakka Kafka 文档中的第一个示例 here。可以像这样根据您的示例进行调整:

 Consumer.DrainingControl<Done> control =
      Consumer.committablePartitionedSource(consumerSettings, Subscriptions.topics("chat"))
              .wireTap(p -> LOGGER.info("SETTING UP PARTITION-{} SOURCE", p.first().partition()))
              .flatMapMerge(Integer.MAX_VALUE, Pair::second)
              .mapAsync(
                16,
                msg -> CompletableFuture
                         .supplyAsync(() -> consumeMessage(msg),
                                      actorSystem.dispatcher())
                         .thenApply(param -> msg.committableOffset()))
              .withAttributes(
                ActorAttributes.supervisionStrategy(
                  ex -> Supervision.restart()))
              .toMat(Committer.sink(committerSettings), Consumer::createDrainingControl)
              .run(Materializer.matFromSystem(actorSystem));

所以基本上 Consumer.committablePartitionedSource() 会在任何时候 Kafka 将分区分配给这个消费者时发出一个 Source 并且当先前分配的分区被重新平衡并从这个消费者那里拿走时会终止这样的 Source

flatMapMerge 将获取那些 Source 并合并它们输出的消息。

所有这些消息将在 mapAsync 阶段竞争以得到处理。这场比赛的公平性实际上取决于上面的 flatMapMerge,它应该给予所有 Source 平等的机会来发出他们的信息。无论有多少 Source 正在输出消息,它们在这里都将共享一个固定的并行度,我相信这就是您所追求的。

所有这些消息最终都会到达处理偏移量提交的 Commiter.sink