map 和 mapAsync 之间的区别

Difference between map and mapAsync

谁能解释一下 map 和 mapAsync 之间的区别 w.r.t AKKA 流? In the documentation据说

Stream transformations and side effects involving external non-stream based services can be performed with mapAsync or mapAsyncUnordered

为什么我们不能在这里简单地映射?我假设 Flow、Source、Sink 本质上都是 Monadic,因此 map 应该可以正常工作 w.r.t 这些本质上的延迟?

签名

差异在 signatures 中得到了最好的强调:Flow.map 接受一个函数,returns 一个类型 TFlow.mapAsync 接受一个函数returns 类型 Future[T]

实例

举个例子,假设我们有一个函数可以根据用户 ID 查询数据库中的用户全名:

type UserID   = String
type FullName = String

val databaseLookup : UserID => FullName = ???  //implementation unimportant

给定一个包含 UserID 值的 akka 流 Source,我们可以在流中使用 Flow.map 来查询数据库并将全名打印到控制台:

val userIDSource : Source[UserID, _] = ???

val stream = 
  userIDSource.via(Flow[UserID].map(databaseLookup))
              .to(Sink.foreach[FullName](println))
              .run()

此方法的一个限制是此流一次只能进行 1 个数据库查询。此串行查询将是 "bottleneck" 并且可能会阻止我们流中的最大吞吐量。

我们可以尝试使用 Future:

通过并发查询来提高性能
def concurrentDBLookup(userID : UserID) : Future[FullName] = 
  Future { databaseLookup(userID) }

val concurrentStream = 
  userIDSource.via(Flow[UserID].map(concurrentDBLookup))
              .to(Sink.foreach[Future[FullName]](_ foreach println))
              .run()

这个简单化的附录的问题在于我们有效地消除了背压。

Sink只是把Future拉进来,加了一个foreach println,相对于数据库查询来说还是比较快的。该流将不断地将需求传播到 Source 并在 Flow.map 内产生更多的 Futures。所以databaseLookup运行的并发数没有限制。不受约束的并行查询最终可能会使数据库过载。

Flow.mapAsync 救援;我们可以同时进行数据库访问,同时限制同时查找的数量:

val maxLookupCount = 10

val maxLookupConcurrentStream = 
  userIDSource.via(Flow[UserID].mapAsync(maxLookupCount)(concurrentDBLookup))
              .to(Sink.foreach[FullName](println))
              .run()

还要注意 Sink.foreach 变得更简单了,它不再接受 Future[FullName] 而是只接受 FullName

无序异步映射

如果不需要维护 UserIDs 到 FullNames 的顺序,那么您可以使用 Flow.mapAsyncUnordered。例如:您只需要将所有名称打印到控制台,但不关心它们的打印顺序。