Akka Streams:如何在 2 个相关流的系统中建模 capacity/rate 限制?

Akka Streams: How do I model capacity/rate limiting within a system of 2 related streams?

假设我有一个披萨烤箱和一系列需要烘烤的披萨。

每次我把披萨放进烤箱时,我都会在 phone 上设置一个计时器。一旦关闭,我就把披萨从烤箱里拿出来,给任何想要的人,容量就可用了。

我这里有 2 个来源,一个是待烹饪的比萨饼队列,另一个是在比萨饼煮熟后响起的煮蛋计时器。系统中还有 2 个接收器,一个是熟披萨的目的地,另一个是发送确认披萨已放入烤箱的地方。

我目前很天真的代表这些,如下:

Source.fromIterator(() => pizzas)
    .map(putInOven) // puts in oven and sets a timer
    .runWith(Sink.actorRef(confirmationDest, EndSignal))

Source.fromIterator(() => timerAlerts)
    .map(removePizza)
    .runWith(Sink.actorRef(pizzaDest, EndSignal))

但是,这两个流目前是完全独立的。 eggTimer 功能正常,每次收集披萨时都会将其移除。但它无法向先前的流程发出容量已可用的信号。事实上,第一个流程根本没有容量的概念,只是一加入就试图把披萨塞进烤箱。

可以使用哪些 Akka 概念来组合这些流程,使第一个流程仅在有容量时从队列中取出披萨,而第二个流程可以 "alert" 第一个流程改变从烤箱中取出披萨时的容量。

我的初步印象是实现这样的流程图:

   ┌─────────────┐                                                          
┌─>│CapacityAvail│>──┐                                                      
│  └─────────────┘   │   ┌─────────────┐   ┌─────────────┐   ┌─────────────┐
│  ┌─────────────┐   ├──>│     Zip     │>─>│  PutInOven  │>─>│   Confirm   │
│  │    Queue    │>──┘   └─────────────┘   └─────────────┘   └─────────────┘
│  └─────────────┘                                                          
│  ┌─────────────┐       ┌─────────────┐                                    
│  │    Done     │>─────>│  SendPizza  │                                    
│  └─────────────┘       └─────────────┘                                    
│         v                                                                 
│         │                                                                 
└─────────┘                    

支撑这一点的原则是有固定数量的 CapacityAvailable 对象填充 CapacityAvail 源。它们与进入比萨队列的事件一起压缩,这意味着如果 none 可用,则不会启动比萨处理,因为压缩操作将等待它们。

然后,一旦披萨吃完,CapacityAvailable 对象就会被推回池中。

我看到此实现的主要障碍是我不确定如何为 CapacityAvail 源创建和填充池,而且我也不确定源是否也可以是接收器。是否有适合此实现的 Source/Sink/Flow 类型?

这个用例通常不能很好地映射到 Akka Streams。在引擎盖下,Akka Stream 是 reactive stream; from the documentation:

Akka Streams implementation uses the Reactive Streams interfaces internally to pass data between the different processing stages.

您的比萨饼示例不适用于流,因为您有一些外部事件,它与流的接收器一样是需求的广播者。您公开声明的事实 "the first flow has no concept of capacity at all" 意味着您没有将流用于其预期目的。

总是可以使用一些奇怪的编码柔术来笨拙地弯曲流来解决并发问题,但您可能很难在线维护此代码。我建议您考虑使用 Futures、Actors 或普通的线程作为您的并发机制。如果您的烤箱有无限的容量来盛放烹饪比萨饼,那么就不需要流开始了。

我还会重新检查您的整个设计,因为您使用时钟时间的流逝作为需求信号(即您的 "egg timer")。这通常表示流程设计存在缺陷。如果您无法绕过此要求,那么您应该评估其他设计模式:

  1. Periodic Message Scheduling
  2. Non Thread Block Timeouts

您可以用 mapAsyncUnordered 台和 parallelism=4 表示烤箱。 Future 的完成可以来自计时器 (http://doc.akka.io/docs/akka/2.4/scala/futures.html#After),或者您出于其他原因决定将其从烤箱中取出。

这就是我最终使用的。这几乎是问题中人造状态机的精确实现。 Source.queue 的机制比我希望的要笨拙得多,但它在其他方面非常干净。真正的接收器和源作为参数提供并在其他地方构造,因此实际实现的样板比这少一点。

RunnableGraph.fromGraph(GraphDSL.create() {
  implicit builder: GraphDSL.Builder[NotUsed] =>
    import GraphDSL.Implicits._

    // Our Capacity Bucket. Can be refilled by passing CapacityAvaiable objects 
    // into capacitySrc. Can be consumed by using capacity as a Source.
    val (capacity, capacitySrc) =
      peekMatValue(Source.queue[CapacityAvailable.type](CONCURRENT_CAPACITY,
                                                        OverflowStrategy.fail))

    // Set initial capacity
    capacitySrc.foreach(c =>
      Seq.fill(CONCURRENT_CAPACITY)(CapacityAvailable).foreach(c.offer))


    // Pull pizzas from the RabbitMQ queue
    val cookQ = RabbitSource(rabbitControl, channel(qos = CONCURRENT_CAPACITY),
                             consume(queue("pizzas-to-cook")), body(as[TaskRun]))

    // Take the blocking events stream and turn into a source
    // (Blocking in a separate dispatcher)
    val cookEventsQ = Source.fromIterator(() => oven.events().asScala)
        .withAttributes(ActorAttributes.dispatcher("blocking-dispatcher"))

    // Split the events stream into two sources so 2 flows can be attached
    val bc = builder.add(Broadcast[PizzaEvent](2))

    // Zip pizzas with the capacity pool. Stops cooking pizzas when oven full.
    // When cooking starts, send the confirmation back to rabbitMQ
    cookQ.zip(AckedSource(capacity)).map(_._1)
      .mapAsync(CONCURRENT_CAPACITY)(pizzaOven.cook)
      .map(Message.queue(_, "pizzas-started-cooking"))
      .acked ~> Sink.actorRef(rabbitControl, HostDied)

    // Send the cook events stream into two flows
    cookEventsQ ~> bc.in

    // The first tops up the capacity pool
    bc.out(0)
      .mapAsync(CONCURRENT_CAPACITY)(e =>
         capacitySrc.flatMap(cs => cs.offer(CapacityAvailable))
      ) ~> Sink.ignore

    // The second sends out cooked events
    bc.out(1)
      .map(p => Message.queue(Cooked(p.id()), "pizzas-cooked")
    ) ~> Sink.actorRef(rabbitControl, HostDied)

    ClosedShape
}).run()