在 akka 流 divertTo 或 alsoTo 中保留物化值类型
Keep materialized value type in akka stream divertTo or alsoTo
我尝试创建一个 Source 类型的 kafka 消费者流
Source<ConsumerMEssage.CommittableMessage<String,String>, Consumer.Control
.
从这个来源我想根据一些谓词使用不同的路径。
因此我想使用 divertTo
方法,有时使用 alsoTo
方法。
两种方法都接受 Sink。我的问题是,当我构建这个 Sink 时,我想让 Sink 具有源提供的物化值类型 Consumer.Control
。
我现在正在做的是像这样建造水槽
private Sink<SomeType, NotUsed> sinkForPathA(){
Flow.of(SomeType.class)
.to(Committer.sink(committerSettings));
}
正如您可能注意到的,物化值类型现在是 NotUsed,这是不需要的。我想要的是:
private Sink<SomeType, Consumer.DrainingControl> sinkForPathA(){
Flow.of(SomeType.class)
.toMat(Committer.sink(committerSettings),Consumer::createDrainingControl);
}
是否可以通过某种方式创建具有预定义物化值类型的流,而不仅仅是 NotUsed
?
你当然可以创建一个 Flow
具有内部 Sink
的物化值。
以Sink.seq()
的物化值类型CompletionStage<List<T>>
为例
如果你说创建一个像这样的 Flow
:
Flow.of(Integer.class)
.to(Sink.seq());
然后正如您指出的那样,您将在返回的 Sink
中得到 NotUsed
。诀窍是使用 toMat
和 keepRight
来保留 Sink
的物化值类型。它会像这样:
Flow.of(Integer.class)
.toMat(Sink.seq(), Keep.right());
现在您将得到 Sink<Integer, CompletionStage<List<Integer>>>
作为结果。
我尝试创建一个 Source 类型的 kafka 消费者流
Source<ConsumerMEssage.CommittableMessage<String,String>, Consumer.Control
.
从这个来源我想根据一些谓词使用不同的路径。
因此我想使用 divertTo
方法,有时使用 alsoTo
方法。
两种方法都接受 Sink。我的问题是,当我构建这个 Sink 时,我想让 Sink 具有源提供的物化值类型 Consumer.Control
。
我现在正在做的是像这样建造水槽
private Sink<SomeType, NotUsed> sinkForPathA(){
Flow.of(SomeType.class)
.to(Committer.sink(committerSettings));
}
正如您可能注意到的,物化值类型现在是 NotUsed,这是不需要的。我想要的是:
private Sink<SomeType, Consumer.DrainingControl> sinkForPathA(){
Flow.of(SomeType.class)
.toMat(Committer.sink(committerSettings),Consumer::createDrainingControl);
}
是否可以通过某种方式创建具有预定义物化值类型的流,而不仅仅是 NotUsed
?
你当然可以创建一个 Flow
具有内部 Sink
的物化值。
以Sink.seq()
的物化值类型CompletionStage<List<T>>
为例
如果你说创建一个像这样的 Flow
:
Flow.of(Integer.class)
.to(Sink.seq());
然后正如您指出的那样,您将在返回的 Sink
中得到 NotUsed
。诀窍是使用 toMat
和 keepRight
来保留 Sink
的物化值类型。它会像这样:
Flow.of(Integer.class)
.toMat(Sink.seq(), Keep.right());
现在您将得到 Sink<Integer, CompletionStage<List<Integer>>>
作为结果。