Akka Streams:缺少源代码中的最后一个元素
Akka Streams: missing last element in Source
我正在关注 this answer
使用 Akka Streams 创建 SQS 消费者:
def queryForMessages = {
val messages = Sqs.receive(queueUrl, 3, 10)
println(s"Received from sqs: ${messages.map(_.getBody)}")
messages
}
def messageListStream : immutable.Stream[Iterable[SqsMessage]] = {
queryForMessages #:: messageListStream
}
def messageIterator() : Iterator[SqsMessage] = messageListStream.flatten.toIterator
Source.fromIterator(messageIterator)
.map(_.getBody)
.runForeach(m => println(s"Stream output: $m"))(materializer)
除了从队列中接收到的最后一个元素没有被流拾取之外,这一切似乎都有效。
即,如果我 post 四个项目到 sqs,只有 3 个项目被流打印出来(项目“2”丢失)。我得到的输出是:
Received from sqs: List(1)
Received from sqs: List(3, 4, 2)
Stream output: 1
Stream output: 3
Stream output: 4
Received from sqs: List()
Received from sqs: List()
缺少的元素 (2) 实际上确实出现了,但如果我 post 更多元素:
Received from sqs: List(5)
Stream output: 2
Received from sqs: List(6)
Stream output: 5
有什么想法吗?
正如我在评论部分所写,我认为问题的根源在于 Streams 以及惰性尾评估被用作中介这一事实。
不需要Stream组件,一个Iterator
就可以解决问题:
val messageListIterator : () => Iterator[Iterable[SqsMessage]] =
() => Iterator continually queryForMessages
val messageIterator : () => Iterator[SqsMessage] =
() => messageListIterator() flatMap identity
这现在可以在你的 akka 流中使用 Source
:
Source
.fromIterator(messageIterator)
.map(_.getBody)
.runForeach(m => println(s"Stream output: $m"))(materializer)
我已经相应地更新了 the linked question。
我正在关注 this answer 使用 Akka Streams 创建 SQS 消费者:
def queryForMessages = {
val messages = Sqs.receive(queueUrl, 3, 10)
println(s"Received from sqs: ${messages.map(_.getBody)}")
messages
}
def messageListStream : immutable.Stream[Iterable[SqsMessage]] = {
queryForMessages #:: messageListStream
}
def messageIterator() : Iterator[SqsMessage] = messageListStream.flatten.toIterator
Source.fromIterator(messageIterator)
.map(_.getBody)
.runForeach(m => println(s"Stream output: $m"))(materializer)
除了从队列中接收到的最后一个元素没有被流拾取之外,这一切似乎都有效。 即,如果我 post 四个项目到 sqs,只有 3 个项目被流打印出来(项目“2”丢失)。我得到的输出是:
Received from sqs: List(1)
Received from sqs: List(3, 4, 2)
Stream output: 1
Stream output: 3
Stream output: 4
Received from sqs: List()
Received from sqs: List()
缺少的元素 (2) 实际上确实出现了,但如果我 post 更多元素:
Received from sqs: List(5)
Stream output: 2
Received from sqs: List(6)
Stream output: 5
有什么想法吗?
正如我在评论部分所写,我认为问题的根源在于 Streams 以及惰性尾评估被用作中介这一事实。
不需要Stream组件,一个Iterator
就可以解决问题:
val messageListIterator : () => Iterator[Iterable[SqsMessage]] =
() => Iterator continually queryForMessages
val messageIterator : () => Iterator[SqsMessage] =
() => messageListIterator() flatMap identity
这现在可以在你的 akka 流中使用 Source
:
Source
.fromIterator(messageIterator)
.map(_.getBody)
.runForeach(m => println(s"Stream output: $m"))(materializer)
我已经相应地更新了 the linked question。