map 和 mapAsync 之间的区别
Difference between map and mapAsync
谁能解释一下 map 和 mapAsync 之间的区别 w.r.t AKKA 流? In the documentation据说
Stream transformations and side effects involving external non-stream
based services can be performed with mapAsync or mapAsyncUnordered
为什么我们不能在这里简单地映射?我假设 Flow、Source、Sink 本质上都是 Monadic,因此 map 应该可以正常工作 w.r.t 这些本质上的延迟?
签名
差异在 signatures 中得到了最好的强调:Flow.map
接受一个函数,returns 一个类型 T
而 Flow.mapAsync
接受一个函数returns 类型 Future[T]
。
实例
举个例子,假设我们有一个函数可以根据用户 ID 查询数据库中的用户全名:
type UserID = String
type FullName = String
val databaseLookup : UserID => FullName = ??? //implementation unimportant
给定一个包含 UserID
值的 akka 流 Source
,我们可以在流中使用 Flow.map
来查询数据库并将全名打印到控制台:
val userIDSource : Source[UserID, _] = ???
val stream =
userIDSource.via(Flow[UserID].map(databaseLookup))
.to(Sink.foreach[FullName](println))
.run()
此方法的一个限制是此流一次只能进行 1 个数据库查询。此串行查询将是 "bottleneck" 并且可能会阻止我们流中的最大吞吐量。
我们可以尝试使用 Future
:
通过并发查询来提高性能
def concurrentDBLookup(userID : UserID) : Future[FullName] =
Future { databaseLookup(userID) }
val concurrentStream =
userIDSource.via(Flow[UserID].map(concurrentDBLookup))
.to(Sink.foreach[Future[FullName]](_ foreach println))
.run()
这个简单化的附录的问题在于我们有效地消除了背压。
Sink只是把Future拉进来,加了一个foreach println
,相对于数据库查询来说还是比较快的。该流将不断地将需求传播到 Source 并在 Flow.map
内产生更多的 Futures。所以databaseLookup
运行的并发数没有限制。不受约束的并行查询最终可能会使数据库过载。
Flow.mapAsync
救援;我们可以同时进行数据库访问,同时限制同时查找的数量:
val maxLookupCount = 10
val maxLookupConcurrentStream =
userIDSource.via(Flow[UserID].mapAsync(maxLookupCount)(concurrentDBLookup))
.to(Sink.foreach[FullName](println))
.run()
还要注意 Sink.foreach
变得更简单了,它不再接受 Future[FullName]
而是只接受 FullName
。
无序异步映射
如果不需要维护 UserIDs 到 FullNames 的顺序,那么您可以使用 Flow.mapAsyncUnordered
。例如:您只需要将所有名称打印到控制台,但不关心它们的打印顺序。
谁能解释一下 map 和 mapAsync 之间的区别 w.r.t AKKA 流? In the documentation据说
Stream transformations and side effects involving external non-stream based services can be performed with mapAsync or mapAsyncUnordered
为什么我们不能在这里简单地映射?我假设 Flow、Source、Sink 本质上都是 Monadic,因此 map 应该可以正常工作 w.r.t 这些本质上的延迟?
签名
差异在 signatures 中得到了最好的强调:Flow.map
接受一个函数,returns 一个类型 T
而 Flow.mapAsync
接受一个函数returns 类型 Future[T]
。
实例
举个例子,假设我们有一个函数可以根据用户 ID 查询数据库中的用户全名:
type UserID = String
type FullName = String
val databaseLookup : UserID => FullName = ??? //implementation unimportant
给定一个包含 UserID
值的 akka 流 Source
,我们可以在流中使用 Flow.map
来查询数据库并将全名打印到控制台:
val userIDSource : Source[UserID, _] = ???
val stream =
userIDSource.via(Flow[UserID].map(databaseLookup))
.to(Sink.foreach[FullName](println))
.run()
此方法的一个限制是此流一次只能进行 1 个数据库查询。此串行查询将是 "bottleneck" 并且可能会阻止我们流中的最大吞吐量。
我们可以尝试使用 Future
:
def concurrentDBLookup(userID : UserID) : Future[FullName] =
Future { databaseLookup(userID) }
val concurrentStream =
userIDSource.via(Flow[UserID].map(concurrentDBLookup))
.to(Sink.foreach[Future[FullName]](_ foreach println))
.run()
这个简单化的附录的问题在于我们有效地消除了背压。
Sink只是把Future拉进来,加了一个foreach println
,相对于数据库查询来说还是比较快的。该流将不断地将需求传播到 Source 并在 Flow.map
内产生更多的 Futures。所以databaseLookup
运行的并发数没有限制。不受约束的并行查询最终可能会使数据库过载。
Flow.mapAsync
救援;我们可以同时进行数据库访问,同时限制同时查找的数量:
val maxLookupCount = 10
val maxLookupConcurrentStream =
userIDSource.via(Flow[UserID].mapAsync(maxLookupCount)(concurrentDBLookup))
.to(Sink.foreach[FullName](println))
.run()
还要注意 Sink.foreach
变得更简单了,它不再接受 Future[FullName]
而是只接受 FullName
。
无序异步映射
如果不需要维护 UserIDs 到 FullNames 的顺序,那么您可以使用 Flow.mapAsyncUnordered
。例如:您只需要将所有名称打印到控制台,但不关心它们的打印顺序。