将字节串流式传输为 WAI HTTP 服务器响应主体

Streaming bytestring as WAI HTTP server response body

我有一个值 body :: BS.ByteString (ResourceT IO) (),来自一个基于 BS.readFile. I want to stream that value as the response body from a Wai Application. There's a helper, streamingResponse that takes a value of the type Stream (Of ByteString) IO r. I'm able to convert my BS.ByteString (ResourceT IO) () to Stream (Of ByteString) (ResourceT IO) () through the use of BS.toChunks 的函数,但它包含一个额外的 ResourceT monad 层。将 body 传递给 streamingResponse 给我:

Couldn't match type ‘ResourceT IO’ with ‘IO’
  Expected type: Stream (Of ByteString) IO ()
    Actual type: Stream (Of ByteString) (ResourceT IO) ()

我尝试过各种方法,例如在 runResourceT 中包装东西、绑定和提升值等,但真的不知道如何进行。如果需要额外的上下文,Here 是完整项目中的行。

更新 0

hoist runResourceT body 好像是打字检查。有人还向我介绍了 Haskell 管道 thread,这可能是一个非常相关的问题,并且可能暗示了解决方案。

而不是 readFile, would withFile + hSetBinaryMode +Data.ByteString.Streaming.fromHandle 就够了吗?

fromHandle 生成一个 ByteString IO (),它可以转换为 streamingResponsestreamingBody 可以接受的 Stream (Of ByteString) IO ()

存在放置 withFile 包围操作的位置的问题。根据 WAI documentation,您可以用它包装 Application 构建函数的结果:

Note that, since WAI 3.0, this type is structured in continuation passing style to allow for proper safe resource handling. This was handled in the past via other means (e.g., ResourceT).

[...]

In order to allocate resources in an exception-safe manner, you can use the bracket pattern outside of the call to responseStream.


注意: streaming-bytestring 的文档说 fromHandle 会在到达 EOF 时自动关闭句柄。查看实现,that doesn't seem to be the case。您需要 withFile 才能正确关闭手柄。

如果我们想要允许 Stream 存在于 ResourceT 中,我们可以不使用 streaming-wai (that only work for Streams based on IO) and instead build on top of functions like responseStream from network-wai 中的函数:

import           Control.Monad.Trans.Resource
import           Network.Wai                     
import           Streaming                   
import qualified Streaming.Prelude               as S
import           Data.ByteString.Builder (byteString, Builder)

streamingResponseR :: Stream (Of ByteString) (ResourceT IO) r
                   -> Status
                   -> ResponseHeaders
                   -> Response
streamingResponseR stream status headers =
    responseStream status headers streamingBody
    where
    streamingBody writeBuilder flush =
        let writer a =
                do liftIO (writeBuilder (byteString a))
                    -- flushes for every produced bytestring, perhaps not optimal
                   liftIO flush
         in runResourceT $ void $ S.effects $ S.for stream writer

streamingBody 的类型为 StreamingBody,它实际上是一个函数 (Builder -> IO ()) -> IO () -> IO () 的类型同义词,该函数将写入回调和刷新回调作为参数,并使用它们来编写使用范围内的某些数据源进行响应。 (请注意,这些回调 由 WAI 提供,而不是由用户提供。)

在我们的例子中,数据源是 Stream 中的 ResourceT。我们需要使用 liftIO 提升写入和刷新回调(位于 IO 中),还记得调用 runResourceT 到 return 一个普通的 IO 操作最后。


如果我们只想在发出的字节串的累积长度达到某个限制后刷新响应怎么办?

我们需要一个函数(此处未实现)在每次达到限制时创建一个除法:

breaks' :: Monad m 
        => Int 
        -> Stream (Of ByteString) m r 
        -> Stream (Stream (Of ByteString) m) m r
breaks' breakSize = undefined

然后我们可以在写入流之前使用 intercalates 在每个组之间插入刷新操作:

streamingBodyFrom :: Stream (Of ByteString) (ResourceT IO) () 
                  -> Int 
                  -> StreamingBody
streamingBodyFrom stream breakSize writeBuilder flush =
    let writer a = liftIO (writeBuilder (byteString a))
        flusher = liftIO flush
        broken = breaks' breakSize stream
     in runResourceT . S.mapM_ writer . S.intercalates flusher $ broken