使用 Akka Streams 流式传输过去和未来的数据

Streaming past and future data using Akka Streams

我试图通过以下示例更好地理解 Akka Streams 的概念。考虑一个银行账户。它有过去的交易历史,并且会有新的交易到来。现在我们想将它用作 Akka 流的源。但是它的数据会被用在3种不同的场景中:

  1. 消费者应用程序收集所有过去的交易并打印报告。
  2. 消费者应用程序是一种交易监视器,它打印从应用程序启动时间开始的所有新交易。
  3. 消费者应用程序结合了 (1) 和 (2) 的功能:它首先打印所有过去的交易,然后打印所有到达的交易。

就 Akka 流而言,我们这里有什么?以不同的数据提供相同流和汇的流源是否存在差异?还是来源相同(都是来自同一个银行账户的所有交易)但我们需要应用不同的过滤操作以获得不同的结果?

Akka 流源可以像 scala 中存在的任何其他 Iterable 一样组合。

根据您的示例,假设我们有保存在数据库中的历史交易。我们可以使用 slick streaming 之类的东西从数据库中获取这些交易:

val historicSource : Source[Transaction, _] = ???

还会有实时交易(可能来自消息系统):

val realtimeSource : Source[Transaction, _] = ???

这两个来源可以合并:

val combinedSource = historicSource ++ realtimeSource

这些组合事件可以被相同的流处理逻辑使用;例如,您可以 println 任何超过 $1,000.00 的交易:

val isLargeTransaction = (_ : Transaction).dollarAmount > 1000.0

val reportTransaction = (transaction : Transaction) =>
  println s"Large Transaction: $transaction"

combinedSource.filter(isLargeTransaction)
              .runWith(Sink foreach reportTransaction)