Akka-streams - 如何访问流的物化值
Akka-streams - how to access the materialized value of the stream
我正在学习使用 Akka 流,并且非常喜欢它,但是具体化部分对我来说仍然有些神秘。
... trigger the immediate shutdown of a specific pool by calling
shutdown() on the HostConnectionPool instance that the pool client
flow materializes into
如何获取 HostConnectionPool 实例?
更重要的是,我想大致了解如何访问物化值并执行某些操作或从中检索信息。
提前感谢您提供任何文档指示或解释。
这是通过您问题中提供的 Source.viaMat
function. Extending the code example from the link 完成的:
import akka.http.scaladsl.Http.HostConnectionPool
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.RunnableGraph
val poolClientFlow = Http().cachedHostConnectionPool[Int]("akka.io")
val graph: RunnableGraph[(HostConnectionPool, Future[(Try[HttpResponse], Int)])] =
Source.single(HttpRequest(uri = "/") -> 42)
.viaMat(poolClientFlow)(Keep.right)
.toMat(Sink.head)(Keep.both)
val (pool, fut) = graph.run()
pool.shutdown()
由于 Source.single
实体化为 Unit
,因此 Keep.right
表示要保留 poolClientFlow
实体化的 HostConnectionPool
。在 .toMat
函数中,Keep.both
表示让左侧池远离流,右侧 Future
来自 Sink
.
我正在学习使用 Akka 流,并且非常喜欢它,但是具体化部分对我来说仍然有些神秘。
... trigger the immediate shutdown of a specific pool by calling shutdown() on the HostConnectionPool instance that the pool client flow materializes into
如何获取 HostConnectionPool 实例?
更重要的是,我想大致了解如何访问物化值并执行某些操作或从中检索信息。
提前感谢您提供任何文档指示或解释。
这是通过您问题中提供的 Source.viaMat
function. Extending the code example from the link 完成的:
import akka.http.scaladsl.Http.HostConnectionPool
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.RunnableGraph
val poolClientFlow = Http().cachedHostConnectionPool[Int]("akka.io")
val graph: RunnableGraph[(HostConnectionPool, Future[(Try[HttpResponse], Int)])] =
Source.single(HttpRequest(uri = "/") -> 42)
.viaMat(poolClientFlow)(Keep.right)
.toMat(Sink.head)(Keep.both)
val (pool, fut) = graph.run()
pool.shutdown()
由于 Source.single
实体化为 Unit
,因此 Keep.right
表示要保留 poolClientFlow
实体化的 HostConnectionPool
。在 .toMat
函数中,Keep.both
表示让左侧池远离流,右侧 Future
来自 Sink
.