在 akka-streams 2.4+ 中执行操作 Before/After 子流

Performing Actions Before/After substream in akka-streams 2.4+

我正在处理来自按键分组的文件的数据流。我创建了一个 class,它带有一个应用方法,可用于通过名为 KeyChanges[T,K] 的键拆分流。在处理子流的第一项之前,我需要从数据库中检索一些数据。每个子流完成后,我需要向队列发送一条消息。在标准的 Scala 序列中,我会做这样的事情:

val groups: Map[Key, Seq[Value]] = stream.groupBy(v => v.k)
val groupSummaryF = Future.sequence(groups.map { case (k, group) =>
  retrieveMyData(k).flatMap { data =>
    Future.sequence(group.map(v => process(data, v))).map(
      k -> _.foldLeft(0) { (a,t) =>
        t match {
          case Success(v) => a + 1
          case Failure(ex) =>
            println(s"failure: $ex")
            a
        }
      }
    ).andThen {
      case Success((key,count)) =>
        sendMessage(count,key)
    }
  }
})

我想用 Akka Streams 做一些类似的事情。在数据检索方面,我可以只缓存数据并为每个元素调用检索函数,但对于队列消息,我确实需要知道子流何时完成。到目前为止,我还没有找到解决方法。有什么想法吗?

您可以直接运行流式传输,并从接收器执行操作。

val categories = Array("DEBUG", "INFO", "WARN", "ERROR")

// assume we have a stream from file which produces categoryId -> message
val lines = (1 to 100).map(x => (Random.nextInt(categories.length), s"message $x"))

def loadDataFromDatabase(categoryId: Int): Future[String] =
  Future.successful(categories(categoryId))

// assume this emits message to the queue
def emitToQueue(x: (String, Int)): Unit =
  println(s"${x._2} messages from category ${x._1}")

val flow =
  Flow[(Int, String)].
    groupBy(4, _._1).
    fold((0, List.empty[String])) { case ((_, acc), (catId, elem)) =>
      (catId, elem :: acc)
    }.
    mapAsync(1) { case (catId, messages) =>
      // here you load your stuff from the database
      loadDataFromDatabase(catId).map(cat => (cat, messages))
    }. // here you may want to do some more processing
    map(x => (x._1, x._2.size)).
    mergeSubstreams

// assume the source is a file
Source.fromIterator(() => lines.iterator).
via(flow).
to(Sink.foreach(emitToQueue)).run()

如果你想运行它用于多个文件,例如报告一次总和,你可以这样做。

val futures = (1 to 4).map { x =>
  Source.fromIterator(() => lines.iterator).via(flow).toMat(Sink.seq[(String, Int)])(Keep.right).run()
}
Future.sequence(futures).map { results =>
  results.flatten.groupBy(_._1).foreach { case (cat, xs) =>
    val total = xs.map(_._2).sum
    println(s"$total messages from category $cat")
  }
}

如您所见,当您 运行 流动时,您就会拥有未来。它将包含一个物化值(流程的结果),当它完成时,您可以随心所欲地使用它。