不允许 Akka Stream OnNext

Akka Stream OnNext is not allowed

我只是按照 akka 流 ActorPublisher 示例进行操作,有时我收到了这条消息:

java.lang.IllegalStateException: onNext is not allowed when the stream has not requested elements, totalDemand was 0

查看文档,他们解释说:

You send elements to the stream by calling onNext. You are allowed to send as many elements as have been requested by the stream subscriber. This amount can be inquired with totalDemand. It is only allowed to use onNext when isActive and totalDemand>0, otherwise onNext will throw IllegalStateException.

When the stream subscriber requests more elements the ActorPublisherMessage.Request message is delivered to this actor, and you can act on that event. The totalDemand is updated automatically.

如何防止 totalDemand 为零?当我收到此错误时,我丢失了我试图发送的消息。

这是我一直关注的示例:

http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-integrations.html

这是我的class测试

object Test extends App {

  implicit val actorSystem = ActorSystem("ReactiveKafka")
  implicit val materializer = ActorFlowMaterializer()

  val kafka = new ReactiveKafka(host = "localhost:9092", zooKeeperHost = "localhost:2181")
  val publisher = kafka.consume("test", "groupName", new StringDecoder())

  val workerActor = actorSystem.actorOf(Props[Worker], name = "workerActor")

  Source(publisher).map(WorkerPool.Msg(_, workerActor)).runWith(Sink.actorSubscriber(WorkerPool.props))

}

好吧,我收到了来自 kafka 的消息,我正在传递给 WorkerActor,但是当向 Kafka 发送 10 messages/sec 时,由于这个错误,其中一些丢失了。

更新

我遇到了此处描述的错误(使用相同的库):

https://github.com/softwaremill/reactive-kafka/issues/11

我使用缓冲区解决了我的问题,但看起来这个 PR 会解决问题。

https://github.com/softwaremill/reactive-kafka/pull/13

如果下游水槽没有任何需求,那么您唯一的选择是

  1. 告诉数据源馈送 Worker 没有需求,这样数据源就可以停止生成消息,直到有更多需求出现(反应式解决方案)。
  2. 缓冲消息,直到您从接收器获得一些需求,这可能会填满您的缓冲区并且您无论如何都会丢弃消息。
  3. 当需求为 0 时删除消息(这似乎是您当前的实现)。

但是"back-pressure"的重点是防止在没有需求的时候调用onNext

要实现上面的 buffering 选项,您可以在 Actor 内部或外部进行缓冲:

  • 内部缓冲区:查看 documentation 中的 "ActorPublisher" 示例,了解在为 ActorPublisher 提供数据的 Actor 中进行缓冲的示例。
  • 外部缓冲区:在流中使用缓冲实体化器或 Flow.buffer 使用外部缓冲区。