Like - 将 Flow 转换为 Collection 或 Publisher

Akka - convert Flow into Collection or Publisher

我正在尝试将 Akka Source 分成两个单独的。

  val requestFlow = Flow[BodyPartEntity].to(Sink.seq) // convert to Seq[BodyPartEntity]
  val dataFlow    = Flow[BodyPartEntity].to(Sink.asPublisher(fanout = false)) // convert to Publisher[BodyPartEntity]

  implicit class EitherSourceExtension[L, R, Mat](source: Source[FormData.BodyPart, Mat]) {
    def partition(left: Sink[BodyPartEntity, NotUsed], right: Sink[BodyPartEntity, NotUsed]): Graph[ClosedShape, NotUsed] = {
      GraphDSL.create() { implicit builder =>
        import akka.stream.scaladsl.GraphDSL.Implicits._
        val partition = builder.add(Partition[FormData.BodyPart](2, element => if (element.getName == "request") 0 else 1))
        source ~> partition.in
        partition.out(0).map(_.getEntity) ~> left
        partition.out(1).map(_.getEntity) ~> right
        ClosedShape
      }
    }
  }

如何将requestFlow转换成Seq[BodyPartEntity]dataFlow转换成Publisher[BodyPartEntity]

您可以为此使用 BroadcastHub。来自文档:

A BroadcastHub can be used to consume elements from a common producer by a dynamic set of consumers.

简化代码:

val runnableGraph: RunnableGraph[Source[Int, NotUsed]] =
  Source(1 to 5).toMat(
    BroadcastHub.sink(bufferSize = 4))(Keep.right)

val fromProducer: Source[Int, NotUsed] = runnableGraph.run()

// Process the messages from the producer in two independent consumers
fromProducer.runForeach(msg => println("consumer1: " + msg))
fromProducer.runForeach(msg => println("consumer2: " + msg))