运行 通过多个管道的 fs2 流,然后合并结果的最佳方法是什么?

What's the best way to run an fs2 stream through multiple pipes, then combine the results?

我正在尝试构建内容可寻址的文件存储。这个过程很简单:获取字节流并将其写入临时位置,同时计算流内容的哈希值,然后一旦流完成,将完全写入的临时对象移动到基于该哈希值的最终位置.

基本上就像这个 Conduit 示例,但比支持它的文件系统更健壮:

storeObject dataDir srcStream = 
  let
     sinks = liftA2 (,)
       (ZipSink (sinkTempFile (dataDir </> "tmp") "ftmp"))
       (ZipSink sinkHash)
  in do
      (tempfile, hash) <- runConduitRes (srcStream .| getZipSink sinks)
      renameFile tempfile (dataDir </> "data" </> unpack (convert hash))
      return (convert (hash :: Digest SHA256))

对于 fs2,我能在 forking streams () 上找到的最佳答案将我引向如下内容:

  def zipPipes[F[_]: Functor: Concurrent, A, B, C]
   (p1: Pipe[F, A, B], p2: Pipe[F, A, C]): 
   Pipe[F, A, (B, C)] = stream => 
      Stream.eval(for {
        q <- Queue.noneTerminated[F, A]
        } yield {
          stream
            .evalTap(a => q.enqueue1(Some(a)))
            .onFinalize(q.enqueue1(None))
            .through(p1)
            .zip(q.dequeue.through(p2))
        }
      ).flatten[F, (B, C)]

(免责声明:我没有验证上面的代码除了编译之外还做了什么)

但我不知道,这一堆管道看起来很简陋,我觉得我缺少一个明显的替代方案?

你并没有遗漏任何东西。您可以与 Topic 共享流,例如:

def shareN[F[_]: Concurrent, A](n: Int): fs2.Pipe[F, A, List[fs2.Stream[F, A]]] = { src =>
  fs2.Stream.eval(Topic[F, A]).flatMap { topic =>
    fs2.Stream(List.fill(n)(topic.subscribe(1))).concurrently(
      topic.subscribers.find(_ == n) >> topic.publish(src)
    )
  }
}

会给你一个固定大小的列表,有一个限制,你必须并行消耗所有东西,否则你会死锁。这可以通过 shapeless 变得类型安全,但这是一个不同的问题。

您也很可能可以将散列输入 fs2.Stream#mapAccumulate 并得到一个元组作为结果。