将字节串流式传输为 WAI HTTP 服务器响应主体
Streaming bytestring as WAI HTTP server response body
我有一个值 body :: BS.ByteString (ResourceT IO) ()
,来自一个基于 BS.readFile
. I want to stream that value as the response body from a Wai Application
. There's a helper, streamingResponse
that takes a value of the type Stream (Of ByteString) IO r
. I'm able to convert my BS.ByteString (ResourceT IO) ()
to Stream (Of ByteString) (ResourceT IO) ()
through the use of BS.toChunks
的函数,但它包含一个额外的 ResourceT
monad 层。将 body
传递给 streamingResponse
给我:
Couldn't match type ‘ResourceT IO’ with ‘IO’
Expected type: Stream (Of ByteString) IO ()
Actual type: Stream (Of ByteString) (ResourceT IO) ()
我尝试过各种方法,例如在 runResourceT 中包装东西、绑定和提升值等,但真的不知道如何进行。如果需要额外的上下文,Here 是完整项目中的行。
更新 0
hoist runResourceT body
好像是打字检查。有人还向我介绍了 Haskell 管道 thread,这可能是一个非常相关的问题,并且可能暗示了解决方案。
而不是 readFile
, would withFile
+ hSetBinaryMode
+Data.ByteString.Streaming.fromHandle
就够了吗?
fromHandle
生成一个 ByteString IO ()
,它可以转换为 streamingResponse
或 streamingBody
可以接受的 Stream (Of ByteString) IO ()
。
存在放置 withFile
包围操作的位置的问题。根据 WAI documentation,您可以用它包装 Application
构建函数的结果:
Note that, since WAI 3.0, this type is structured in continuation
passing style to allow for proper safe resource handling. This was
handled in the past via other means (e.g., ResourceT).
[...]
In order to allocate resources in an exception-safe manner, you can
use the bracket pattern outside of the call to responseStream.
注意: streaming-bytestring 的文档说 fromHandle
会在到达 EOF 时自动关闭句柄。查看实现,that doesn't seem to be the case。您需要 withFile
才能正确关闭手柄。
如果我们想要允许 Stream
存在于 ResourceT
中,我们可以不使用 streaming-wai (that only work for Stream
s based on IO
) and instead build on top of functions like responseStream
from network-wai 中的函数:
import Control.Monad.Trans.Resource
import Network.Wai
import Streaming
import qualified Streaming.Prelude as S
import Data.ByteString.Builder (byteString, Builder)
streamingResponseR :: Stream (Of ByteString) (ResourceT IO) r
-> Status
-> ResponseHeaders
-> Response
streamingResponseR stream status headers =
responseStream status headers streamingBody
where
streamingBody writeBuilder flush =
let writer a =
do liftIO (writeBuilder (byteString a))
-- flushes for every produced bytestring, perhaps not optimal
liftIO flush
in runResourceT $ void $ S.effects $ S.for stream writer
streamingBody
的类型为 StreamingBody
,它实际上是一个函数 (Builder -> IO ()) -> IO () -> IO ()
的类型同义词,该函数将写入回调和刷新回调作为参数,并使用它们来编写使用范围内的某些数据源进行响应。 (请注意,这些回调 由 WAI 提供,而不是由用户提供。)
在我们的例子中,数据源是 Stream
中的 ResourceT
。我们需要使用 liftIO
提升写入和刷新回调(位于 IO
中),还记得调用 runResourceT
到 return 一个普通的 IO
操作最后。
如果我们只想在发出的字节串的累积长度达到某个限制后刷新响应怎么办?
我们需要一个函数(此处未实现)在每次达到限制时创建一个除法:
breaks' :: Monad m
=> Int
-> Stream (Of ByteString) m r
-> Stream (Stream (Of ByteString) m) m r
breaks' breakSize = undefined
然后我们可以在写入流之前使用 intercalates
在每个组之间插入刷新操作:
streamingBodyFrom :: Stream (Of ByteString) (ResourceT IO) ()
-> Int
-> StreamingBody
streamingBodyFrom stream breakSize writeBuilder flush =
let writer a = liftIO (writeBuilder (byteString a))
flusher = liftIO flush
broken = breaks' breakSize stream
in runResourceT . S.mapM_ writer . S.intercalates flusher $ broken
我有一个值 body :: BS.ByteString (ResourceT IO) ()
,来自一个基于 BS.readFile
. I want to stream that value as the response body from a Wai Application
. There's a helper, streamingResponse
that takes a value of the type Stream (Of ByteString) IO r
. I'm able to convert my BS.ByteString (ResourceT IO) ()
to Stream (Of ByteString) (ResourceT IO) ()
through the use of BS.toChunks
的函数,但它包含一个额外的 ResourceT
monad 层。将 body
传递给 streamingResponse
给我:
Couldn't match type ‘ResourceT IO’ with ‘IO’
Expected type: Stream (Of ByteString) IO ()
Actual type: Stream (Of ByteString) (ResourceT IO) ()
我尝试过各种方法,例如在 runResourceT 中包装东西、绑定和提升值等,但真的不知道如何进行。如果需要额外的上下文,Here 是完整项目中的行。
更新 0
hoist runResourceT body
好像是打字检查。有人还向我介绍了 Haskell 管道 thread,这可能是一个非常相关的问题,并且可能暗示了解决方案。
而不是 readFile
, would withFile
+ hSetBinaryMode
+Data.ByteString.Streaming.fromHandle
就够了吗?
fromHandle
生成一个 ByteString IO ()
,它可以转换为 streamingResponse
或 streamingBody
可以接受的 Stream (Of ByteString) IO ()
。
存在放置 withFile
包围操作的位置的问题。根据 WAI documentation,您可以用它包装 Application
构建函数的结果:
Note that, since WAI 3.0, this type is structured in continuation passing style to allow for proper safe resource handling. This was handled in the past via other means (e.g., ResourceT).
[...]
In order to allocate resources in an exception-safe manner, you can use the bracket pattern outside of the call to responseStream.
注意: streaming-bytestring 的文档说 fromHandle
会在到达 EOF 时自动关闭句柄。查看实现,that doesn't seem to be the case。您需要 withFile
才能正确关闭手柄。
如果我们想要允许 Stream
存在于 ResourceT
中,我们可以不使用 streaming-wai (that only work for Stream
s based on IO
) and instead build on top of functions like responseStream
from network-wai 中的函数:
import Control.Monad.Trans.Resource
import Network.Wai
import Streaming
import qualified Streaming.Prelude as S
import Data.ByteString.Builder (byteString, Builder)
streamingResponseR :: Stream (Of ByteString) (ResourceT IO) r
-> Status
-> ResponseHeaders
-> Response
streamingResponseR stream status headers =
responseStream status headers streamingBody
where
streamingBody writeBuilder flush =
let writer a =
do liftIO (writeBuilder (byteString a))
-- flushes for every produced bytestring, perhaps not optimal
liftIO flush
in runResourceT $ void $ S.effects $ S.for stream writer
streamingBody
的类型为 StreamingBody
,它实际上是一个函数 (Builder -> IO ()) -> IO () -> IO ()
的类型同义词,该函数将写入回调和刷新回调作为参数,并使用它们来编写使用范围内的某些数据源进行响应。 (请注意,这些回调 由 WAI 提供,而不是由用户提供。)
在我们的例子中,数据源是 Stream
中的 ResourceT
。我们需要使用 liftIO
提升写入和刷新回调(位于 IO
中),还记得调用 runResourceT
到 return 一个普通的 IO
操作最后。
如果我们只想在发出的字节串的累积长度达到某个限制后刷新响应怎么办?
我们需要一个函数(此处未实现)在每次达到限制时创建一个除法:
breaks' :: Monad m
=> Int
-> Stream (Of ByteString) m r
-> Stream (Stream (Of ByteString) m) m r
breaks' breakSize = undefined
然后我们可以在写入流之前使用 intercalates
在每个组之间插入刷新操作:
streamingBodyFrom :: Stream (Of ByteString) (ResourceT IO) ()
-> Int
-> StreamingBody
streamingBodyFrom stream breakSize writeBuilder flush =
let writer a = liftIO (writeBuilder (byteString a))
flusher = liftIO flush
broken = breaks' breakSize stream
in runResourceT . S.mapM_ writer . S.intercalates flusher $ broken