在 akka 流 divertTo 或 alsoTo 中保留物化值类型

Keep materialized value type in akka stream divertTo or alsoTo

我尝试创建一个 Source 类型的 kafka 消费者流 Source<ConsumerMEssage.CommittableMessage<String,String>, Consumer.Control.

从这个来源我想根据一些谓词使用不同的路径。 因此我想使用 divertTo 方法,有时使用 alsoTo 方法。

两种方法都接受 Sink。我的问题是,当我构建这个 Sink 时,我想让 Sink 具有源提供的物化值类型 Consumer.Control。 我现在正在做的是像这样建造水槽

private Sink<SomeType, NotUsed> sinkForPathA(){
  Flow.of(SomeType.class)
    .to(Committer.sink(committerSettings));
}

正如您可能注意到的,物化值类型现在是 NotUsed,这是不需要的。我想要的是:

private Sink<SomeType, Consumer.DrainingControl> sinkForPathA(){
  Flow.of(SomeType.class)
    .toMat(Committer.sink(committerSettings),Consumer::createDrainingControl);
}

是否可以通过某种方式创建具有预定义物化值类型的流,而不仅仅是 NotUsed

你当然可以创建一个 Flow 具有内部 Sink 的物化值。

Sink.seq()的物化值类型CompletionStage<List<T>>为例

如果你说创建一个像这样的 Flow:

Flow.of(Integer.class)
    .to(Sink.seq());

然后正如您指出的那样,您将在返回的 Sink 中得到 NotUsed。诀窍是使用 toMatkeepRight 来保留 Sink 的物化值类型。它会像这样:

Flow.of(Integer.class)
    .toMat(Sink.seq(), Keep.right());

现在您将得到 Sink<Integer, CompletionStage<List<Integer>>> 作为结果。