Akka Streams:如何在 2 个相关流的系统中建模 capacity/rate 限制?
Akka Streams: How do I model capacity/rate limiting within a system of 2 related streams?
假设我有一个披萨烤箱和一系列需要烘烤的披萨。
每次我把披萨放进烤箱时,我都会在 phone 上设置一个计时器。一旦关闭,我就把披萨从烤箱里拿出来,给任何想要的人,容量就可用了。
我这里有 2 个来源,一个是待烹饪的比萨饼队列,另一个是在比萨饼煮熟后响起的煮蛋计时器。系统中还有 2 个接收器,一个是熟披萨的目的地,另一个是发送确认披萨已放入烤箱的地方。
我目前很天真的代表这些,如下:
Source.fromIterator(() => pizzas)
.map(putInOven) // puts in oven and sets a timer
.runWith(Sink.actorRef(confirmationDest, EndSignal))
Source.fromIterator(() => timerAlerts)
.map(removePizza)
.runWith(Sink.actorRef(pizzaDest, EndSignal))
但是,这两个流目前是完全独立的。 eggTimer 功能正常,每次收集披萨时都会将其移除。但它无法向先前的流程发出容量已可用的信号。事实上,第一个流程根本没有容量的概念,只是一加入就试图把披萨塞进烤箱。
可以使用哪些 Akka 概念来组合这些流程,使第一个流程仅在有容量时从队列中取出披萨,而第二个流程可以 "alert" 第一个流程改变从烤箱中取出披萨时的容量。
我的初步印象是实现这样的流程图:
┌─────────────┐
┌─>│CapacityAvail│>──┐
│ └─────────────┘ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ ┌─────────────┐ ├──>│ Zip │>─>│ PutInOven │>─>│ Confirm │
│ │ Queue │>──┘ └─────────────┘ └─────────────┘ └─────────────┘
│ └─────────────┘
│ ┌─────────────┐ ┌─────────────┐
│ │ Done │>─────>│ SendPizza │
│ └─────────────┘ └─────────────┘
│ v
│ │
└─────────┘
支撑这一点的原则是有固定数量的 CapacityAvailable 对象填充 CapacityAvail
源。它们与进入比萨队列的事件一起压缩,这意味着如果 none 可用,则不会启动比萨处理,因为压缩操作将等待它们。
然后,一旦披萨吃完,CapacityAvailable 对象就会被推回池中。
我看到此实现的主要障碍是我不确定如何为 CapacityAvail 源创建和填充池,而且我也不确定源是否也可以是接收器。是否有适合此实现的 Source/Sink/Flow 类型?
这个用例通常不能很好地映射到 Akka Streams。在引擎盖下,Akka Stream 是 reactive stream; from the documentation:
Akka Streams implementation uses the Reactive Streams interfaces
internally to pass data between the different processing stages.
您的比萨饼示例不适用于流,因为您有一些外部事件,它与流的接收器一样是需求的广播者。您公开声明的事实 "the first flow has no concept of capacity at all" 意味着您没有将流用于其预期目的。
总是可以使用一些奇怪的编码柔术来笨拙地弯曲流来解决并发问题,但您可能很难在线维护此代码。我建议您考虑使用 Futures、Actors 或普通的线程作为您的并发机制。如果您的烤箱有无限的容量来盛放烹饪比萨饼,那么就不需要流开始了。
我还会重新检查您的整个设计,因为您使用时钟时间的流逝作为需求信号(即您的 "egg timer")。这通常表示流程设计存在缺陷。如果您无法绕过此要求,那么您应该评估其他设计模式:
您可以用 mapAsyncUnordered
台和 parallelism=4
表示烤箱。 Future
的完成可以来自计时器 (http://doc.akka.io/docs/akka/2.4/scala/futures.html#After),或者您出于其他原因决定将其从烤箱中取出。
这就是我最终使用的。这几乎是问题中人造状态机的精确实现。 Source.queue
的机制比我希望的要笨拙得多,但它在其他方面非常干净。真正的接收器和源作为参数提供并在其他地方构造,因此实际实现的样板比这少一点。
RunnableGraph.fromGraph(GraphDSL.create() {
implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
// Our Capacity Bucket. Can be refilled by passing CapacityAvaiable objects
// into capacitySrc. Can be consumed by using capacity as a Source.
val (capacity, capacitySrc) =
peekMatValue(Source.queue[CapacityAvailable.type](CONCURRENT_CAPACITY,
OverflowStrategy.fail))
// Set initial capacity
capacitySrc.foreach(c =>
Seq.fill(CONCURRENT_CAPACITY)(CapacityAvailable).foreach(c.offer))
// Pull pizzas from the RabbitMQ queue
val cookQ = RabbitSource(rabbitControl, channel(qos = CONCURRENT_CAPACITY),
consume(queue("pizzas-to-cook")), body(as[TaskRun]))
// Take the blocking events stream and turn into a source
// (Blocking in a separate dispatcher)
val cookEventsQ = Source.fromIterator(() => oven.events().asScala)
.withAttributes(ActorAttributes.dispatcher("blocking-dispatcher"))
// Split the events stream into two sources so 2 flows can be attached
val bc = builder.add(Broadcast[PizzaEvent](2))
// Zip pizzas with the capacity pool. Stops cooking pizzas when oven full.
// When cooking starts, send the confirmation back to rabbitMQ
cookQ.zip(AckedSource(capacity)).map(_._1)
.mapAsync(CONCURRENT_CAPACITY)(pizzaOven.cook)
.map(Message.queue(_, "pizzas-started-cooking"))
.acked ~> Sink.actorRef(rabbitControl, HostDied)
// Send the cook events stream into two flows
cookEventsQ ~> bc.in
// The first tops up the capacity pool
bc.out(0)
.mapAsync(CONCURRENT_CAPACITY)(e =>
capacitySrc.flatMap(cs => cs.offer(CapacityAvailable))
) ~> Sink.ignore
// The second sends out cooked events
bc.out(1)
.map(p => Message.queue(Cooked(p.id()), "pizzas-cooked")
) ~> Sink.actorRef(rabbitControl, HostDied)
ClosedShape
}).run()
假设我有一个披萨烤箱和一系列需要烘烤的披萨。
每次我把披萨放进烤箱时,我都会在 phone 上设置一个计时器。一旦关闭,我就把披萨从烤箱里拿出来,给任何想要的人,容量就可用了。
我这里有 2 个来源,一个是待烹饪的比萨饼队列,另一个是在比萨饼煮熟后响起的煮蛋计时器。系统中还有 2 个接收器,一个是熟披萨的目的地,另一个是发送确认披萨已放入烤箱的地方。
我目前很天真的代表这些,如下:
Source.fromIterator(() => pizzas)
.map(putInOven) // puts in oven and sets a timer
.runWith(Sink.actorRef(confirmationDest, EndSignal))
Source.fromIterator(() => timerAlerts)
.map(removePizza)
.runWith(Sink.actorRef(pizzaDest, EndSignal))
但是,这两个流目前是完全独立的。 eggTimer 功能正常,每次收集披萨时都会将其移除。但它无法向先前的流程发出容量已可用的信号。事实上,第一个流程根本没有容量的概念,只是一加入就试图把披萨塞进烤箱。
可以使用哪些 Akka 概念来组合这些流程,使第一个流程仅在有容量时从队列中取出披萨,而第二个流程可以 "alert" 第一个流程改变从烤箱中取出披萨时的容量。
我的初步印象是实现这样的流程图:
┌─────────────┐
┌─>│CapacityAvail│>──┐
│ └─────────────┘ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ ┌─────────────┐ ├──>│ Zip │>─>│ PutInOven │>─>│ Confirm │
│ │ Queue │>──┘ └─────────────┘ └─────────────┘ └─────────────┘
│ └─────────────┘
│ ┌─────────────┐ ┌─────────────┐
│ │ Done │>─────>│ SendPizza │
│ └─────────────┘ └─────────────┘
│ v
│ │
└─────────┘
支撑这一点的原则是有固定数量的 CapacityAvailable 对象填充 CapacityAvail
源。它们与进入比萨队列的事件一起压缩,这意味着如果 none 可用,则不会启动比萨处理,因为压缩操作将等待它们。
然后,一旦披萨吃完,CapacityAvailable 对象就会被推回池中。
我看到此实现的主要障碍是我不确定如何为 CapacityAvail 源创建和填充池,而且我也不确定源是否也可以是接收器。是否有适合此实现的 Source/Sink/Flow 类型?
这个用例通常不能很好地映射到 Akka Streams。在引擎盖下,Akka Stream 是 reactive stream; from the documentation:
Akka Streams implementation uses the Reactive Streams interfaces internally to pass data between the different processing stages.
您的比萨饼示例不适用于流,因为您有一些外部事件,它与流的接收器一样是需求的广播者。您公开声明的事实 "the first flow has no concept of capacity at all" 意味着您没有将流用于其预期目的。
总是可以使用一些奇怪的编码柔术来笨拙地弯曲流来解决并发问题,但您可能很难在线维护此代码。我建议您考虑使用 Futures、Actors 或普通的线程作为您的并发机制。如果您的烤箱有无限的容量来盛放烹饪比萨饼,那么就不需要流开始了。
我还会重新检查您的整个设计,因为您使用时钟时间的流逝作为需求信号(即您的 "egg timer")。这通常表示流程设计存在缺陷。如果您无法绕过此要求,那么您应该评估其他设计模式:
您可以用 mapAsyncUnordered
台和 parallelism=4
表示烤箱。 Future
的完成可以来自计时器 (http://doc.akka.io/docs/akka/2.4/scala/futures.html#After),或者您出于其他原因决定将其从烤箱中取出。
这就是我最终使用的。这几乎是问题中人造状态机的精确实现。 Source.queue
的机制比我希望的要笨拙得多,但它在其他方面非常干净。真正的接收器和源作为参数提供并在其他地方构造,因此实际实现的样板比这少一点。
RunnableGraph.fromGraph(GraphDSL.create() {
implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
// Our Capacity Bucket. Can be refilled by passing CapacityAvaiable objects
// into capacitySrc. Can be consumed by using capacity as a Source.
val (capacity, capacitySrc) =
peekMatValue(Source.queue[CapacityAvailable.type](CONCURRENT_CAPACITY,
OverflowStrategy.fail))
// Set initial capacity
capacitySrc.foreach(c =>
Seq.fill(CONCURRENT_CAPACITY)(CapacityAvailable).foreach(c.offer))
// Pull pizzas from the RabbitMQ queue
val cookQ = RabbitSource(rabbitControl, channel(qos = CONCURRENT_CAPACITY),
consume(queue("pizzas-to-cook")), body(as[TaskRun]))
// Take the blocking events stream and turn into a source
// (Blocking in a separate dispatcher)
val cookEventsQ = Source.fromIterator(() => oven.events().asScala)
.withAttributes(ActorAttributes.dispatcher("blocking-dispatcher"))
// Split the events stream into two sources so 2 flows can be attached
val bc = builder.add(Broadcast[PizzaEvent](2))
// Zip pizzas with the capacity pool. Stops cooking pizzas when oven full.
// When cooking starts, send the confirmation back to rabbitMQ
cookQ.zip(AckedSource(capacity)).map(_._1)
.mapAsync(CONCURRENT_CAPACITY)(pizzaOven.cook)
.map(Message.queue(_, "pizzas-started-cooking"))
.acked ~> Sink.actorRef(rabbitControl, HostDied)
// Send the cook events stream into two flows
cookEventsQ ~> bc.in
// The first tops up the capacity pool
bc.out(0)
.mapAsync(CONCURRENT_CAPACITY)(e =>
capacitySrc.flatMap(cs => cs.offer(CapacityAvailable))
) ~> Sink.ignore
// The second sends out cooked events
bc.out(1)
.map(p => Message.queue(Cooked(p.id()), "pizzas-cooked")
) ~> Sink.actorRef(rabbitControl, HostDied)
ClosedShape
}).run()