Akka 流和事务边界
Akka streams and transaction boundaries
我仍在掌握 Akka 流的概念,并试图理解当我们有一组需要以原子方式处理的项目时,如何将它们映射到场景。假设我们有一个包含多个项目的采购订单,我们需要对每个项目应用一些处理,然后将它们合并回一个值。这样的工作流是否应该成为它自己的独立流(或子流),一旦采购订单被完全处理就关闭? IE。每个采购订单都开始一个新流?或者我有一连串永无止境的采购订单?但是这样的话,会不会出现不同订单混购的问题呢?
换句话说,我想要实现的是处理不同工作流的隔离,并想知道 Akka 流是否能很好地匹配它。
直接回答您的问题:可以创建一个 "apply some processing to each item and then merge them back in a single value" 的流。
使用示例代码开发示例:
case class Item(itemId : String)
case class PurchaseOrder(orderId : String, items : Seq[Item])
val purchaseOrder : PurschaseOrder = ???
如果我们想用流处理项目,我们可以,尽管减少的确切性质在问题中是模棱两可的,所以我不会定义折叠是如何实现的:
type ProcessOutput = ???
def processItem(item : Item) : ProcessOutput = ???
val combinedResult : Future[CombinedResult] =
Source.fromIterator( purchaseOrder.items.toIterator )
.via(Flow[Item] map processItem)
.to(Sink.fold[ProcessOutput](???)(???) )
.run()
间接回答你的问题,
先考虑期货
Akka 流在需要背压时非常有用。当您连接到外部数据源时,背压很常见,因为 bp 允许您的应用程序确定数据流式传输给您的速度,因为您负责不断发出对更多数据的需求信号。
你在问题中提出的情况下不需要广播需求,这种通信需要。您已经有了一系列项目,因此没有人向...发送需求...
相反,我认为 Futures 是您描述的案例的最佳方式:
def futProcess(item : Item)(implicit ec : ExecutionContext) =
Future { processItem(item) }
// same output type as the stream run
val combinedResults : Future[CombinedResult] =
Future.sequence{ purchaseOrder.items map futProcess }
.map{ _ fold[ProcessOutput](???)(???) }
您将获得更好的性能,拥有完整的 ActorSystem 会降低复杂性,并且无论如何都会获得与流完全相同的结果...
我仍在掌握 Akka 流的概念,并试图理解当我们有一组需要以原子方式处理的项目时,如何将它们映射到场景。假设我们有一个包含多个项目的采购订单,我们需要对每个项目应用一些处理,然后将它们合并回一个值。这样的工作流是否应该成为它自己的独立流(或子流),一旦采购订单被完全处理就关闭? IE。每个采购订单都开始一个新流?或者我有一连串永无止境的采购订单?但是这样的话,会不会出现不同订单混购的问题呢?
换句话说,我想要实现的是处理不同工作流的隔离,并想知道 Akka 流是否能很好地匹配它。
直接回答您的问题:可以创建一个 "apply some processing to each item and then merge them back in a single value" 的流。
使用示例代码开发示例:
case class Item(itemId : String)
case class PurchaseOrder(orderId : String, items : Seq[Item])
val purchaseOrder : PurschaseOrder = ???
如果我们想用流处理项目,我们可以,尽管减少的确切性质在问题中是模棱两可的,所以我不会定义折叠是如何实现的:
type ProcessOutput = ???
def processItem(item : Item) : ProcessOutput = ???
val combinedResult : Future[CombinedResult] =
Source.fromIterator( purchaseOrder.items.toIterator )
.via(Flow[Item] map processItem)
.to(Sink.fold[ProcessOutput](???)(???) )
.run()
间接回答你的问题,
先考虑期货
Akka 流在需要背压时非常有用。当您连接到外部数据源时,背压很常见,因为 bp 允许您的应用程序确定数据流式传输给您的速度,因为您负责不断发出对更多数据的需求信号。
你在问题中提出的情况下不需要广播需求,
相反,我认为 Futures 是您描述的案例的最佳方式:
def futProcess(item : Item)(implicit ec : ExecutionContext) =
Future { processItem(item) }
// same output type as the stream run
val combinedResults : Future[CombinedResult] =
Future.sequence{ purchaseOrder.items map futProcess }
.map{ _ fold[ProcessOutput](???)(???) }
您将获得更好的性能,拥有完整的 ActorSystem 会降低复杂性,并且无论如何都会获得与流完全相同的结果...