如何从 Source 或 Flow 中的元素生成物化值?

How to generate the materialized value from the elements in Source or Flow?

假设有一个 Source[Int, NotUsed] 类型的源。如何将其转换为 Source[Int, T],其中物化值 T 是根据源元素计算的?

示例:我想对流中的元素求和;如何实现 dumbFlow 所以结果应该是 6 而不是 42?

val dumbFlow = ??? //Flow[Int].mapMaterializedValue(_ => 42)

//code below cannot be changed
val source = Source(List(1, 2, 3)).viaMat(dumbFlow)(Keep.right)
val result = source.toMat(Sink.ignore)(Keep.left).run() 
//result: Int = 42

我知道如何使用 Sink.foldSink.head 获得相同的结果,但我需要 Source 中的具体化逻辑;无法更改 .to(Sink.ignore).

严格来说,物化值总是在单个元素通过流之前计算(包括任何 mapMaterializedValue/toMat/viaMat 等),因此不能依赖于流的元素。

如果物化值恰好是一个Future(在Scala中API),未来可以构造(虽然还没有完成)并且流可以完成Future 基于元素。通常,Future 物化值来自汇(例如,正如您所指出的 Sink.fold/Sink.head)。

SourceFlow 上的 alsoTo 运算符可让您将 Sink 嵌入到 Source/Flow 的一侧.它有一个 alsoToMat 伙伴,可以让您将 Sink 的物化值与 Source/Flow 的物化值结合起来。

所以可以

val summingSink = Sink.fold[Int, Int](0)(_ + _)
val dumbFlow: Flow[Int, Int, Future[Int]] = Flow[Int].alsoToMat(summingSink)(Keep.right)
val result: Future[Int] = source.toMat(Sink.ignore)(Keep.left).run()
result.foreach(println _)
// will eventually print 6