Akka Streams:缺少源代码中的最后一个元素

Akka Streams: missing last element in Source

我正在关注 this answer 使用 Akka Streams 创建 SQS 消费者:

  def queryForMessages = {
    val messages = Sqs.receive(queueUrl, 3, 10)
    println(s"Received from sqs: ${messages.map(_.getBody)}")
    messages
  }

  def messageListStream : immutable.Stream[Iterable[SqsMessage]] = {
    queryForMessages #:: messageListStream
  }

  def messageIterator() : Iterator[SqsMessage] = messageListStream.flatten.toIterator

  Source.fromIterator(messageIterator)
    .map(_.getBody)
    .runForeach(m => println(s"Stream output: $m"))(materializer)

除了从队列中接收到的最后一个元素没有被流拾取之外,这一切似乎都有效。 即,如果我 post 四个项目到 sqs,只有 3 个项目被流打印出来(项目“2”丢失)。我得到的输出是:

Received from sqs: List(1)
Received from sqs: List(3, 4, 2)
Stream output: 1
Stream output: 3
Stream output: 4
Received from sqs: List()
Received from sqs: List()

缺少的元素 (2) 实际上确实出现了,但如果我 post 更多元素:

Received from sqs: List(5)
Stream output: 2
Received from sqs: List(6)
Stream output: 5

有什么想法吗?

正如我在评论部分所写,我认为问题的根源在于 Streams 以及惰性尾评估被用作中介这一事实。

不需要Stream组件,一个Iterator就可以解决问题:

val messageListIterator : () => Iterator[Iterable[SqsMessage]] = 
  () => Iterator continually queryForMessages

val messageIterator : () => Iterator[SqsMessage] = 
  () => messageListIterator() flatMap identity

这现在可以在你的 akka 流中使用 Source:

Source
  .fromIterator(messageIterator)
  .map(_.getBody)
  .runForeach(m => println(s"Stream output: $m"))(materializer)

我已经相应地更新了 the linked question