如何将类型化流程通过管道传输到 wai-conduit 的 responseSource?
How to Pipe Typed Process to wai-conduit's responseSource?
我想要扭曲 运行 一个进程,然后用该进程的输出响应。假定输出大于服务器的 RAM;加载整个输出然后响应不是一种选择。我以为我可以使用类似
的东西来完成这个
withProcessWait_ (setStdout createSource "cat largefile") (pure . responseSource ok200 [] . getStdout)
但 responseSource
uses ConduitT i (Flush Builder) IO ()
and createSource
使用 ConduitT i ByteString m ()
。我不知道如何将 ByteString
管道转换为 Flush Builder
管道。
所以我设计了一个似乎可行的解决方案,但遗憾的是它的定义不太简单:
responseProcess :: Status -> ResponseHeaders -> ProcessConfig in out err -> Response
responseProcess s hs cfg = responseStream s hs $ \send flush ->
withProcessWait_ (setStdout createPipe cfg) $ \p@(getStdout -> h) ->
let loop = do
bs <- hGetSome h defaultChunkSize
unless (BS.null bs) (send (byteString bs) *> flush *> loop)
in loop *> hClose h
。这是必要的吗,即使我可能会尝试通过包装 mkStreamSpec
或其他东西来美化它?还是我缺少更简单的方法?
编辑:对解决方案的评论:
intersperseC
让我可以同时使用 Chunk
和 Flush
。这解决了 Flush Builder
/ByteString
转换问题。我还没有测试过它,但它看起来不错,我相信它已经被使用过。
然而,我发现
withProcessWait_ (setStdout createSource "cat largefile") $ \p ->
responseSource ok200 [] (getStdout p .| mapC (Chunk . byteString) .| intersperseC Flush)
过早关闭进程句柄。因此我需要自己管理管道:使用 createPipe
而不是 createSource
。但这意味着我需要在最后调用 hClose
,这意味着我需要一个 returns IO ()
的响应处理程序;唯一这样做的(除了 responseRaw
)是 responseStream
,它使用 StreamingBody
作为 Conduit 的 替代品 。因此我得出结论,我的原始解决方案是必需的,并且 Conduit 不能用于流处理。如果不正确,请随时更正。
responseSource
有类型
responseSource :: Status -> ResponseHeaders -> Source IO (Flush
Builder) -> Response
而 Flush
的定义是
data Flush a = Chunk a | Flush
也就是说,类型 Flush Builder
的值是 Builder
或指示 warp 刷新输出流的命令。
Builder
is from the binary package. It's basically a representation of a chunk of bytes, optimized for efficient concatenation. And it can be constructed from a ByteString
, using the fromByteString
函数。
知道这一点,并使用 conduit 中的 mapC
,我们可以定义这个适配器:
adapter :: :: Monad m => ConduitT ByteString (Flush Builder) m ()
adapter = mapC (Chunk . fromByteString)
但是有一个问题,适配器从不刷新。但是我们可以通过intersperseC
:
穿插flusing命令
adapter :: :: Monad m => ConduitT ByteString (Flush Builder) m ()
adapter = mapC (Chunk . fromByteString) .| intersperseC Flush
如果我们不想在每个块后刷新怎么办?也许我们可以在将字节块转换为 Flush
值之前使用 chunksOfCE
对字节块进行分组。
我想要扭曲 运行 一个进程,然后用该进程的输出响应。假定输出大于服务器的 RAM;加载整个输出然后响应不是一种选择。我以为我可以使用类似
的东西来完成这个withProcessWait_ (setStdout createSource "cat largefile") (pure . responseSource ok200 [] . getStdout)
但 responseSource
uses ConduitT i (Flush Builder) IO ()
and createSource
使用 ConduitT i ByteString m ()
。我不知道如何将 ByteString
管道转换为 Flush Builder
管道。
所以我设计了一个似乎可行的解决方案,但遗憾的是它的定义不太简单:
responseProcess :: Status -> ResponseHeaders -> ProcessConfig in out err -> Response
responseProcess s hs cfg = responseStream s hs $ \send flush ->
withProcessWait_ (setStdout createPipe cfg) $ \p@(getStdout -> h) ->
let loop = do
bs <- hGetSome h defaultChunkSize
unless (BS.null bs) (send (byteString bs) *> flush *> loop)
in loop *> hClose h
。这是必要的吗,即使我可能会尝试通过包装 mkStreamSpec
或其他东西来美化它?还是我缺少更简单的方法?
编辑:对解决方案的评论:
intersperseC
让我可以同时使用 Chunk
和 Flush
。这解决了 Flush Builder
/ByteString
转换问题。我还没有测试过它,但它看起来不错,我相信它已经被使用过。
然而,我发现
withProcessWait_ (setStdout createSource "cat largefile") $ \p ->
responseSource ok200 [] (getStdout p .| mapC (Chunk . byteString) .| intersperseC Flush)
过早关闭进程句柄。因此我需要自己管理管道:使用 createPipe
而不是 createSource
。但这意味着我需要在最后调用 hClose
,这意味着我需要一个 returns IO ()
的响应处理程序;唯一这样做的(除了 responseRaw
)是 responseStream
,它使用 StreamingBody
作为 Conduit 的 替代品 。因此我得出结论,我的原始解决方案是必需的,并且 Conduit 不能用于流处理。如果不正确,请随时更正。
responseSource
有类型
responseSource :: Status -> ResponseHeaders -> Source IO (Flush Builder) -> Response
而 Flush
的定义是
data Flush a = Chunk a | Flush
也就是说,类型 Flush Builder
的值是 Builder
或指示 warp 刷新输出流的命令。
Builder
is from the binary package. It's basically a representation of a chunk of bytes, optimized for efficient concatenation. And it can be constructed from a ByteString
, using the fromByteString
函数。
知道这一点,并使用 conduit 中的 mapC
,我们可以定义这个适配器:
adapter :: :: Monad m => ConduitT ByteString (Flush Builder) m ()
adapter = mapC (Chunk . fromByteString)
但是有一个问题,适配器从不刷新。但是我们可以通过intersperseC
:
adapter :: :: Monad m => ConduitT ByteString (Flush Builder) m ()
adapter = mapC (Chunk . fromByteString) .| intersperseC Flush
如果我们不想在每个块后刷新怎么办?也许我们可以在将字节块转换为 Flush
值之前使用 chunksOfCE
对字节块进行分组。