FS2:是否可以优雅地完成队列?
FS2: is it possible to complete Queue gracefully?
假设我想将一些传统的异步 API 转换为 FS2 流。
API 提供了一个具有 3 个回调的接口:下一个元素、成功、错误。
我希望 Stream 发出所有元素,然后在收到成功或错误回调时完成。
FS2 指南 (https://functional-streams-for-scala.github.io/fs2/guide.html) 建议在这种情况下使用 fs2.Queue
,
它非常适合入队,但到目前为止我看到的所有示例都希望 queue.dequeue
returns 的流永远不会完成 -
在我的情况下,没有明显的方法来处理 success/error 回调。
我尝试使用 queue.dequeue.interruptWhen(...here goes the signal...)
,但是如果 success/error 回调在客户端从流中读取数据之前到达,
流过早终止 - 仍有未读元素。我希望消费者在完成流之前完成阅读。
FS2 可以做到吗?使用 Akka Streams 很简单——SourceQueueWithComplete
有 complete
和 fail
方法。
更新:
通过将元素包装在 Option 中并将 None 视为停止读取流的信号,另外通过使用 Promise 来传播错误,我能够获得足够好的结果:
queue.dequeue
.interruptWhen(interruptingPromise.get)
.takeWhile(_.isDefined).map(_.get)
但是,我是否忽略了做这些事情的更自然的方式?
一种惯用的方法是创建 Queue[Option[A]]
而不是 Queue[A]
。入队时,包装在 Some
中,您可以显式入队 None
以表示完成。在出队方面,执行 q.dequeue.unNoneTerminate
,这会给你一个 Stream[F, A]
,一旦队列发出 None
就会终止
对您的更新的回答:将 unNoneTerminate
与 rethrow
合并,它需要一个 Stream[F, Either[Throwable, A]]
和 returns 一个 Stream[F, A]
,错误为 [=14] =] 当它遇到一个 throwable 时。
你的完整堆栈将是 Stream[F, Either[Throwable, Option[A]]]
,你通过调用 .rethrow.unNoneTerminate
解包成 Stream[F,A]
假设我想将一些传统的异步 API 转换为 FS2 流。 API 提供了一个具有 3 个回调的接口:下一个元素、成功、错误。 我希望 Stream 发出所有元素,然后在收到成功或错误回调时完成。
FS2 指南 (https://functional-streams-for-scala.github.io/fs2/guide.html) 建议在这种情况下使用 fs2.Queue
,
它非常适合入队,但到目前为止我看到的所有示例都希望 queue.dequeue
returns 的流永远不会完成 -
在我的情况下,没有明显的方法来处理 success/error 回调。
我尝试使用 queue.dequeue.interruptWhen(...here goes the signal...)
,但是如果 success/error 回调在客户端从流中读取数据之前到达,
流过早终止 - 仍有未读元素。我希望消费者在完成流之前完成阅读。
FS2 可以做到吗?使用 Akka Streams 很简单——SourceQueueWithComplete
有 complete
和 fail
方法。
更新: 通过将元素包装在 Option 中并将 None 视为停止读取流的信号,另外通过使用 Promise 来传播错误,我能够获得足够好的结果:
queue.dequeue
.interruptWhen(interruptingPromise.get)
.takeWhile(_.isDefined).map(_.get)
但是,我是否忽略了做这些事情的更自然的方式?
一种惯用的方法是创建 Queue[Option[A]]
而不是 Queue[A]
。入队时,包装在 Some
中,您可以显式入队 None
以表示完成。在出队方面,执行 q.dequeue.unNoneTerminate
,这会给你一个 Stream[F, A]
,一旦队列发出 None
对您的更新的回答:将 unNoneTerminate
与 rethrow
合并,它需要一个 Stream[F, Either[Throwable, A]]
和 returns 一个 Stream[F, A]
,错误为 [=14] =] 当它遇到一个 throwable 时。
你的完整堆栈将是 Stream[F, Either[Throwable, Option[A]]]
,你通过调用 .rethrow.unNoneTerminate
Stream[F,A]