如何使用内部排队反应流订阅者处理 OnComplete 消息?

How to handle OnComplete message with internal queuing reactive stream subscriber?

我正在使用带有简单反应流的 Akka-Stream 1.0:

 override val requestStrategy = new MaxInFlightRequestStrategy(max = 20) {
    override def inFlightInternally: Int = messageBacklog.size

发布者将通过发送 OnComplete 消息在 N 条消息后(动态地)关闭流。

订阅者收到消息并立即进入 canceled 状态。问题是,订阅者需要一些时间来处理每条消息,这意味着我通常有一些消息积压 - 当订阅者获得 canceled 时无法再处理 - 恕我直言 ActorSubscriber.scala:195

处理消息意味着我的订阅者会将工作卸载给其他人(通过 Spray 的 ChunkedMessages 发回内容)并在消息完成后立即收到确认消息。由于 Actor 被取消,因此永远不会处理 ack 消息并处理积压。

推荐什么让我完成积压? 我可以 'invent' 我自己的 'Done Marker' 但这对我来说听起来很奇怪。显然我的代码适用于 MaxInFlightRequestStrategy 并且最多为 1 - 因为那里的需求总是只有 1 - 这意味着我从来没有积压的消息。

经过长时间的调试和尝试,我想我明白了 was/is 发生了什么 - 希望它能节省其他人的时间:

我认为我对如何实现响应式订阅者存在概念上的误解而失败了: 我在 ActorSubscriber 内部假脱机消息,并在正确的时间通过 self ! SpooledMessage 将这些假脱机消息发布回业务逻辑 - 这导致订阅者的计算变得疯狂:每条假脱机消息都被计算了两次因为 'received' 导致内部要求来自上游的更多消息。

通过处理 actor 内部的假脱机消息解决了这个问题 - 让我也可以正确使用 OnComplete:一旦收到此消息,订阅者就不会收到任何新消息,但我自己处理内部队列(不使用self ! ...),从而完成整个流处理。