运行 通过多个管道的 fs2 流,然后合并结果的最佳方法是什么?
What's the best way to run an fs2 stream through multiple pipes, then combine the results?
我正在尝试构建内容可寻址的文件存储。这个过程很简单:获取字节流并将其写入临时位置,同时计算流内容的哈希值,然后一旦流完成,将完全写入的临时对象移动到基于该哈希值的最终位置.
基本上就像这个 Conduit 示例,但比支持它的文件系统更健壮:
storeObject dataDir srcStream =
let
sinks = liftA2 (,)
(ZipSink (sinkTempFile (dataDir </> "tmp") "ftmp"))
(ZipSink sinkHash)
in do
(tempfile, hash) <- runConduitRes (srcStream .| getZipSink sinks)
renameFile tempfile (dataDir </> "data" </> unpack (convert hash))
return (convert (hash :: Digest SHA256))
对于 fs2,我能在 forking streams () 上找到的最佳答案将我引向如下内容:
def zipPipes[F[_]: Functor: Concurrent, A, B, C]
(p1: Pipe[F, A, B], p2: Pipe[F, A, C]):
Pipe[F, A, (B, C)] = stream =>
Stream.eval(for {
q <- Queue.noneTerminated[F, A]
} yield {
stream
.evalTap(a => q.enqueue1(Some(a)))
.onFinalize(q.enqueue1(None))
.through(p1)
.zip(q.dequeue.through(p2))
}
).flatten[F, (B, C)]
(免责声明:我没有验证上面的代码除了编译之外还做了什么)
但我不知道,这一堆管道看起来很简陋,我觉得我缺少一个明显的替代方案?
你并没有遗漏任何东西。您可以与 Topic
共享流,例如:
def shareN[F[_]: Concurrent, A](n: Int): fs2.Pipe[F, A, List[fs2.Stream[F, A]]] = { src =>
fs2.Stream.eval(Topic[F, A]).flatMap { topic =>
fs2.Stream(List.fill(n)(topic.subscribe(1))).concurrently(
topic.subscribers.find(_ == n) >> topic.publish(src)
)
}
}
会给你一个固定大小的列表,有一个限制,你必须并行消耗所有东西,否则你会死锁。这可以通过 shapeless 变得类型安全,但这是一个不同的问题。
您也很可能可以将散列输入 fs2.Stream#mapAccumulate
并得到一个元组作为结果。
我正在尝试构建内容可寻址的文件存储。这个过程很简单:获取字节流并将其写入临时位置,同时计算流内容的哈希值,然后一旦流完成,将完全写入的临时对象移动到基于该哈希值的最终位置.
基本上就像这个 Conduit 示例,但比支持它的文件系统更健壮:
storeObject dataDir srcStream =
let
sinks = liftA2 (,)
(ZipSink (sinkTempFile (dataDir </> "tmp") "ftmp"))
(ZipSink sinkHash)
in do
(tempfile, hash) <- runConduitRes (srcStream .| getZipSink sinks)
renameFile tempfile (dataDir </> "data" </> unpack (convert hash))
return (convert (hash :: Digest SHA256))
对于 fs2,我能在 forking streams (
def zipPipes[F[_]: Functor: Concurrent, A, B, C]
(p1: Pipe[F, A, B], p2: Pipe[F, A, C]):
Pipe[F, A, (B, C)] = stream =>
Stream.eval(for {
q <- Queue.noneTerminated[F, A]
} yield {
stream
.evalTap(a => q.enqueue1(Some(a)))
.onFinalize(q.enqueue1(None))
.through(p1)
.zip(q.dequeue.through(p2))
}
).flatten[F, (B, C)]
(免责声明:我没有验证上面的代码除了编译之外还做了什么)
但我不知道,这一堆管道看起来很简陋,我觉得我缺少一个明显的替代方案?
你并没有遗漏任何东西。您可以与 Topic
共享流,例如:
def shareN[F[_]: Concurrent, A](n: Int): fs2.Pipe[F, A, List[fs2.Stream[F, A]]] = { src =>
fs2.Stream.eval(Topic[F, A]).flatMap { topic =>
fs2.Stream(List.fill(n)(topic.subscribe(1))).concurrently(
topic.subscribers.find(_ == n) >> topic.publish(src)
)
}
}
会给你一个固定大小的列表,有一个限制,你必须并行消耗所有东西,否则你会死锁。这可以通过 shapeless 变得类型安全,但这是一个不同的问题。
您也很可能可以将散列输入 fs2.Stream#mapAccumulate
并得到一个元组作为结果。