如何将类型化流程通过管道传输到 wai-conduit 的 responseSource?

How to Pipe Typed Process to wai-conduit's responseSource?

我想要扭曲 运行 一个进程,然后用该进程的输出响应。假定输出大于服务器的 RAM;加载整个输出然后响应不是一种选择。我以为我可以使用类似

的东西来完成这个
withProcessWait_ (setStdout createSource "cat largefile") (pure . responseSource ok200 [] . getStdout)

responseSource uses ConduitT i (Flush Builder) IO () and createSource 使用 ConduitT i ByteString m ()。我不知道如何将 ByteString 管道转换为 Flush Builder 管道。


所以我设计了一个似乎可行的解决方案,但遗憾的是它的定义不太简单:

responseProcess :: Status -> ResponseHeaders -> ProcessConfig in out err -> Response
responseProcess s hs cfg = responseStream s hs $ \send flush ->                                                                                                                                
     withProcessWait_ (setStdout createPipe cfg) $ \p@(getStdout -> h) ->
         let loop = do
             bs <- hGetSome h defaultChunkSize
             unless (BS.null bs) (send (byteString bs) *> flush *> loop)
         in loop *> hClose h 

。这是必要的吗,即使我可能会尝试通过包装 mkStreamSpec 或其他东西来美化它?还是我缺少更简单的方法?


编辑:对解决方案的评论:

intersperseC 让我可以同时使用 ChunkFlush。这解决了 Flush Builder/ByteString 转换问题。我还没有测试过它,但它看起来不错,我相信它已经被使用过。

然而,我发现

withProcessWait_ (setStdout createSource "cat largefile") $ \p ->
    responseSource ok200 [] (getStdout p .| mapC (Chunk . byteString) .| intersperseC Flush)

过早关闭进程句柄。因此我需要自己管理管道:使用 createPipe 而不是 createSource。但这意味着我需要在最后调用 hClose,这意味着我需要一个 returns IO () 的响应处理程序;唯一这样做的(除了 responseRaw)是 responseStream,它使用 StreamingBody 作为 Conduit 的 替代品 。因此我得出结论,我的原始解决方案是必需的,并且 Conduit 不能用于流处理。如果不正确,请随时更正。

responseSource 有类型

responseSource :: Status -> ResponseHeaders -> Source IO (Flush Builder) -> Response

Flush 的定义是

data Flush a = Chunk a | Flush

也就是说,类型 Flush Builder 的值是 Builder 或指示 warp 刷新输出流的命令。

Builder is from the binary package. It's basically a representation of a chunk of bytes, optimized for efficient concatenation. And it can be constructed from a ByteString, using the fromByteString 函数。

知道这一点,并使用 conduit 中的 mapC,我们可以定义这个适配器:

adapter :: :: Monad m => ConduitT ByteString (Flush Builder) m ()
adapter = mapC (Chunk . fromByteString) 

但是有一个问题,适配器从不刷新。但是我们可以通过intersperseC:

穿插flusing命令
adapter :: :: Monad m => ConduitT ByteString (Flush Builder) m ()
adapter = mapC (Chunk . fromByteString) .| intersperseC Flush

如果我们不想在每个块后刷新怎么办?也许我们可以在将字节块转换为 Flush 值之前使用 chunksOfCE 对字节块进行分组。