Like - 将 Flow 转换为 Collection 或 Publisher
Akka - convert Flow into Collection or Publisher
我正在尝试将 Akka Source
分成两个单独的。
val requestFlow = Flow[BodyPartEntity].to(Sink.seq) // convert to Seq[BodyPartEntity]
val dataFlow = Flow[BodyPartEntity].to(Sink.asPublisher(fanout = false)) // convert to Publisher[BodyPartEntity]
implicit class EitherSourceExtension[L, R, Mat](source: Source[FormData.BodyPart, Mat]) {
def partition(left: Sink[BodyPartEntity, NotUsed], right: Sink[BodyPartEntity, NotUsed]): Graph[ClosedShape, NotUsed] = {
GraphDSL.create() { implicit builder =>
import akka.stream.scaladsl.GraphDSL.Implicits._
val partition = builder.add(Partition[FormData.BodyPart](2, element => if (element.getName == "request") 0 else 1))
source ~> partition.in
partition.out(0).map(_.getEntity) ~> left
partition.out(1).map(_.getEntity) ~> right
ClosedShape
}
}
}
如何将requestFlow
转换成Seq[BodyPartEntity]
和dataFlow
转换成Publisher[BodyPartEntity]
您可以为此使用 BroadcastHub。来自文档:
A BroadcastHub can be used to consume elements from a common producer by a dynamic set of consumers.
简化代码:
val runnableGraph: RunnableGraph[Source[Int, NotUsed]] =
Source(1 to 5).toMat(
BroadcastHub.sink(bufferSize = 4))(Keep.right)
val fromProducer: Source[Int, NotUsed] = runnableGraph.run()
// Process the messages from the producer in two independent consumers
fromProducer.runForeach(msg => println("consumer1: " + msg))
fromProducer.runForeach(msg => println("consumer2: " + msg))
我正在尝试将 Akka Source
分成两个单独的。
val requestFlow = Flow[BodyPartEntity].to(Sink.seq) // convert to Seq[BodyPartEntity]
val dataFlow = Flow[BodyPartEntity].to(Sink.asPublisher(fanout = false)) // convert to Publisher[BodyPartEntity]
implicit class EitherSourceExtension[L, R, Mat](source: Source[FormData.BodyPart, Mat]) {
def partition(left: Sink[BodyPartEntity, NotUsed], right: Sink[BodyPartEntity, NotUsed]): Graph[ClosedShape, NotUsed] = {
GraphDSL.create() { implicit builder =>
import akka.stream.scaladsl.GraphDSL.Implicits._
val partition = builder.add(Partition[FormData.BodyPart](2, element => if (element.getName == "request") 0 else 1))
source ~> partition.in
partition.out(0).map(_.getEntity) ~> left
partition.out(1).map(_.getEntity) ~> right
ClosedShape
}
}
}
如何将requestFlow
转换成Seq[BodyPartEntity]
和dataFlow
转换成Publisher[BodyPartEntity]
您可以为此使用 BroadcastHub。来自文档:
A BroadcastHub can be used to consume elements from a common producer by a dynamic set of consumers.
简化代码:
val runnableGraph: RunnableGraph[Source[Int, NotUsed]] =
Source(1 to 5).toMat(
BroadcastHub.sink(bufferSize = 4))(Keep.right)
val fromProducer: Source[Int, NotUsed] = runnableGraph.run()
// Process the messages from the producer in two independent consumers
fromProducer.runForeach(msg => println("consumer1: " + msg))
fromProducer.runForeach(msg => println("consumer2: " + msg))