如何从 Source 或 Flow 中的元素生成物化值?
How to generate the materialized value from the elements in Source or Flow?
假设有一个 Source[Int, NotUsed]
类型的源。如何将其转换为 Source[Int, T]
,其中物化值 T
是根据源元素计算的?
示例:我想对流中的元素求和;如何实现 dumbFlow
所以结果应该是 6 而不是 42?
val dumbFlow = ??? //Flow[Int].mapMaterializedValue(_ => 42)
//code below cannot be changed
val source = Source(List(1, 2, 3)).viaMat(dumbFlow)(Keep.right)
val result = source.toMat(Sink.ignore)(Keep.left).run()
//result: Int = 42
我知道如何使用 Sink.fold
或 Sink.head
获得相同的结果,但我需要 Source
中的具体化逻辑;无法更改 .to(Sink.ignore)
.
严格来说,物化值总是在单个元素通过流之前计算(包括任何 mapMaterializedValue
/toMat
/viaMat
等),因此不能依赖于流的元素。
如果物化值恰好是一个Future
(在Scala中API),未来可以构造(虽然还没有完成)并且流可以完成Future
基于元素。通常,Future
物化值来自汇(例如,正如您所指出的 Sink.fold
/Sink.head
)。
Source
或 Flow
上的 alsoTo
运算符可让您将 Sink
嵌入到 Source
/Flow
的一侧.它有一个 alsoToMat
伙伴,可以让您将 Sink
的物化值与 Source
/Flow
的物化值结合起来。
所以可以
val summingSink = Sink.fold[Int, Int](0)(_ + _)
val dumbFlow: Flow[Int, Int, Future[Int]] = Flow[Int].alsoToMat(summingSink)(Keep.right)
val result: Future[Int] = source.toMat(Sink.ignore)(Keep.left).run()
result.foreach(println _)
// will eventually print 6
假设有一个 Source[Int, NotUsed]
类型的源。如何将其转换为 Source[Int, T]
,其中物化值 T
是根据源元素计算的?
示例:我想对流中的元素求和;如何实现 dumbFlow
所以结果应该是 6 而不是 42?
val dumbFlow = ??? //Flow[Int].mapMaterializedValue(_ => 42)
//code below cannot be changed
val source = Source(List(1, 2, 3)).viaMat(dumbFlow)(Keep.right)
val result = source.toMat(Sink.ignore)(Keep.left).run()
//result: Int = 42
我知道如何使用 Sink.fold
或 Sink.head
获得相同的结果,但我需要 Source
中的具体化逻辑;无法更改 .to(Sink.ignore)
.
严格来说,物化值总是在单个元素通过流之前计算(包括任何 mapMaterializedValue
/toMat
/viaMat
等),因此不能依赖于流的元素。
如果物化值恰好是一个Future
(在Scala中API),未来可以构造(虽然还没有完成)并且流可以完成Future
基于元素。通常,Future
物化值来自汇(例如,正如您所指出的 Sink.fold
/Sink.head
)。
Source
或 Flow
上的 alsoTo
运算符可让您将 Sink
嵌入到 Source
/Flow
的一侧.它有一个 alsoToMat
伙伴,可以让您将 Sink
的物化值与 Source
/Flow
的物化值结合起来。
所以可以
val summingSink = Sink.fold[Int, Int](0)(_ + _)
val dumbFlow: Flow[Int, Int, Future[Int]] = Flow[Int].alsoToMat(summingSink)(Keep.right)
val result: Future[Int] = source.toMat(Sink.ignore)(Keep.left).run()
result.foreach(println _)
// will eventually print 6