作为消息的 Streams ActorRefSource 顺序

Akka Streams ActorRefSource order of messages

我想使用 akka Streams 的 ActorRefSource 构建一个项目序列。所述源被连续地馈送数据。计算完成后,Stream 会被 Poison Pill 终止。

下面的简化示例表明了我的意图:

val source = Source.actorRef[Int](1000, OverflowStrategy.fail)
    .mapMaterializedValue{ ref =>
      for(i <- 1 to 1000) {
        ref ! i
      }

      ref ! PoisonPill
    }

    source.runWith(Sink.seq).foreach(s => println("count: "+s.size))

我原以为 Stream 会处理所有 1000 个元素,然后由于收到 Poison Pill 而终止。不幸的是,Stream 通常终止得更早。示例输出为:

count: 24

发送毒丸之前等待一段时间,例如1000 毫秒导致处理所有数字。

任何有关如何确保在收到 Poison Pill 之前处理完所有项目的任何想法都将不胜感激。

参见 the documentation for Source.actorRef:PoisonPill 在终止流之前不刷新缓冲区。