将字节流式传输到网络 websocket
Streaming bytes to network websocket
我有一个代码使用文件句柄来模拟来自源 (AWS S3
) 的流 Bytestring
的接收器。如果我们想使用 Network.Websocket
as the sink, would it suffice to swap LBS.writeFile
in the code below with sendBinaryData
(带连接句柄)?
{-# LANGUAGE OverloadedStrings,ScopedTypeVariables #-}
import qualified Aws
import qualified Aws.S3 as S3
import Data.Conduit (($$+-))
import qualified Data.Conduit.List as CL (mapM_)
import qualified Data.ByteString.Streaming.HTTP as SP
import qualified Data.ByteString.Lazy as LBS
import Streaming as S
import Streaming.Prelude as S hiding (show,print)
import Control.Concurrent.Async (async,waitCatch)
import Data.Text as T (Text)
data AwsConfig a = AwsConfig { _aws_cfg :: Aws.Configuration, _aws_s3cfg :: S3.S3Configuration a, _aws_httpmgr :: SP.Manager }
getObject :: AwsConfig Aws.NormalQuery -> T.Text -> T.Text -> IO Int
getObject cfg bucket key = do
req <- waitCatch =<< async (runResourceT $ do
{- Create a request object with S3.getObject and run the request with pureAws. -}
S3.GetObjectResponse { S3.gorResponse = rsp, S3.gorMetadata = mdata } <-
Aws.pureAws (_aws_cfg cfg) (_aws_s3cfg cfg) (_aws_httpmgr cfg) $
S3.getObject bucket key
{- Stream the response to a lazy bytestring -}
liftIO $ LBS.writeFile "testaws" LBS.empty -- this will be replaced by content-length of the bytes
let obj = (($$+- CL.mapM_ S.yield) . hoist lift ) (SP.responseBody rsp)
S.mapM_ (liftIO . (LBS.appendFile "testaws") . LBS.fromStrict) obj
return $ lookup "content-length" (S3.omUserMetadata mdata))
case req of
Left _ -> return 2 -- perhaps, we could use this to send an error message over websocket
Right _ -> return 0
让我困惑的是如何确定流的终止?对于文件,这由 writeFile
API 处理。 sendBinaryData
呢?它是否以与 writeFile
类似的方式处理终止?还是由客户端的数据解析器决定?
更新
这个问题是关于如何将数据流式传输到 websocket 句柄(让我们假设已经提供了句柄),就像我们在上面的示例中处理文件句柄一样,而不是真正关于如何在 resourceT
。 conduit
似乎确实采用 mapM_
方法来接收数据。所以,看来这确实是要走的路。
termination问题是因为我有这样的思路:如果我们有一个函数监听Websocket句柄另一端的数据,然后确定消息结束似乎在流媒体环境中很重要。给出如下函数:
f :: LBS.ByteString -> a
如果我们 S.mapM_
将数据流式传输到 websocket 句柄,它是否负责添加某种 end of stream
标记以便 f
在另一端监听可以停止处理惰性字节串。否则 f
将不知道消息何时完成。
你认为手柄需要额外的技巧是正确的。但是,由于您已经在使用 ResourceT
monad 转换器,因此这是 delightfully simple to do with allocate
。 allocate
允许您在资源 monad 中创建一个句柄并注册一个清理操作(在您的情况下只是关闭连接)。
ok <- runResourceT $ do
(releaseKey, handle) <-
allocate (WebSockets.acceptRequest request)
(`WebSockets.sendClose` closeMessage)
WebSockets.sendBinaryData handle data
return ok
where
request = ...
closeMessage = ...
data = ...
ok = ...
通过使用 allocate
,句柄保证在 runResourceT
return 秒 ok
.
之前关闭
不过,我不完全确定这就是您想要的。在我看来 getObject
应该不知道如何接受和关闭 WS 连接;也许它应该将 WS 连接句柄作为参数然后写入它。如果您将其 return 类型升级为 ResourceT
那么您可以将 caller 收费为 getObject
并负责调用 runResourceT
和分配WS 句柄等等。但希望上面的示例足以让您继续前进。
(警告 - 代码未经测试。)
您的代码会重新打开输出文件并在每次传入数据包时附加到它。显然,更好的解决方案是使用 LBS.hPutStr
使用已打开的文件句柄写入文件.
也就是说,而不是:
S.mapM_ (liftIO . (LBS.appendFile "testaws") . LBS.fromStrict) obj
您想使用:
S.mapM_ (liftIO . (LBS.hPutStr h) . LBS.fromStrict) obj
当然,这引用了句柄 h
,那是从哪里来的?
一个解决方案是将它传递给 getObject
或者在调用 getObject
的主体之前创建它,例如:
getObject cfg bucket key = withFile "output" $ \h -> do
req <- ...
...
S.mapM_ (liftIO . (LBS.hPutStr h) . LBS.fromStrict) obj
...
或者您可能必须在 runResourceT 中创建...我不确定。
更新 - 请参阅@haoformayor 的回答,了解如何让 ResourceT 为您管理文件句柄。
这里有一些细节可以使事情更容易理解。首先,对于第一个小演示,修改你的 getObject
,我使用 Streaming.ByteString.writeFile
,无论如何都在 ResourceT
中,以减少惰性字节串的绕路。
{-# LANGUAGE OverloadedStrings,ScopedTypeVariables #-}
import qualified Aws
import qualified Aws.S3 as S3
import Data.Conduit
import qualified Data.Conduit.List as CL (mapM_)
import qualified Data.ByteString.Streaming.HTTP as HTTP
import qualified Data.ByteString.Streaming as SB
import qualified Data.ByteString.Streaming.Internal as SB
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL
import Streaming as S
import Streaming.Prelude as S hiding (show,print)
import Control.Concurrent.Async (async,waitCatch)
import Data.Text as T (Text)
import qualified Network.WebSockets as WebSockets
import Control.Monad.Trans.Resource
data AwsConfig a = AwsConfig { _aws_cfg :: Aws.Configuration
, _aws_s3cfg :: S3.S3Configuration a
, _aws_httpmgr :: HTTP.Manager }
getObject :: AwsConfig Aws.NormalQuery -> FilePath -> T.Text -> T.Text -> IO Int
getObject cfg file bucket key = do
req <- waitCatch =<< async (runResourceT $ do
S3.GetObjectResponse { S3.gorResponse = rsp, S3.gorMetadata = mdata } <-
Aws.pureAws (_aws_cfg cfg) (_aws_s3cfg cfg) (_aws_httpmgr cfg) $
S3.getObject bucket key
let bytestream = do
-- lookup "content-length" (S3.omUserMetadata mdata))
SB.chunk B.empty -- this will be replaced by content-length
hoist lift (HTTP.responseBody rsp) $$+- CL.mapM_ SB.chunk
SB.writeFile file bytestream ) -- this is in ResourceT
case req of
Left _ -> return 2
Right _ -> return 0
我们可以或多或少地从中抽象出你用 SB.writeFile
:
做的事情
getObjectAbstracted
:: (SB.ByteString (ResourceT IO) () -> ResourceT IO b)
-> AwsConfig Aws.NormalQuery -> S3.Bucket -> Text -> ResourceT IO b
getObjectAbstracted action cfg bucket key = do
S3.GetObjectResponse { S3.gorResponse = rsp, S3.gorMetadata = mdata } <-
Aws.pureAws (_aws_cfg cfg)
(_aws_s3cfg cfg)
(_aws_httpmgr cfg)
(S3.getObject bucket key)
action (hoist lift (HTTP.responseBody rsp) $$+- CL.mapM_ SB.chunk)
现在,我们需要一个流字节串库中没有的小帮手
mapMChunks_ :: Monad m => (B.ByteString -> m ()) -> SB.ByteString m r -> m r
mapMChunks_ act bytestream = do
(a S.:> r) <- SB.foldlChunksM (\_ bs -> act bs) (return ()) bytestream
return r
并且可以或多或少地按照@haoformayor 的计划进行,使用流字节串
writeConnection :: MonadIO m => WebSockets.Connection -> SB.ByteString m r -> m r
writeConnection connection =
mapMChunks_ (liftIO . WebSockets.sendBinaryData connection)
-- following `haoformayor`
connectWrite
:: (MonadResource m, WebSockets.WebSocketsData a)
=> WebSockets.PendingConnection
-> a -- closing message
-> SB.ByteString m r -- stream from aws
-> m r
connectWrite request closeMessage bytestream = do
(releaseKey, connection) <- allocate (WebSockets.acceptRequest request)
(`WebSockets.sendClose` closeMessage)
writeConnection connection bytestream
getObjectWS :: WebSockets.WebSocketsData a =>
WebSockets.PendingConnection
-> a
-> AwsConfig Aws.NormalQuery
-> S3.Bucket
-> Text
-> ResourceT IO ()
getObjectWS request closeMessage = getObjectAbstracted (connectWrite request closeMessage)
当然 none 到此为止利用了 conduit
和 streaming
/streaming-bytestring
.
之间的任何区别
我有一个代码使用文件句柄来模拟来自源 (AWS S3
) 的流 Bytestring
的接收器。如果我们想使用 Network.Websocket
as the sink, would it suffice to swap LBS.writeFile
in the code below with sendBinaryData
(带连接句柄)?
{-# LANGUAGE OverloadedStrings,ScopedTypeVariables #-}
import qualified Aws
import qualified Aws.S3 as S3
import Data.Conduit (($$+-))
import qualified Data.Conduit.List as CL (mapM_)
import qualified Data.ByteString.Streaming.HTTP as SP
import qualified Data.ByteString.Lazy as LBS
import Streaming as S
import Streaming.Prelude as S hiding (show,print)
import Control.Concurrent.Async (async,waitCatch)
import Data.Text as T (Text)
data AwsConfig a = AwsConfig { _aws_cfg :: Aws.Configuration, _aws_s3cfg :: S3.S3Configuration a, _aws_httpmgr :: SP.Manager }
getObject :: AwsConfig Aws.NormalQuery -> T.Text -> T.Text -> IO Int
getObject cfg bucket key = do
req <- waitCatch =<< async (runResourceT $ do
{- Create a request object with S3.getObject and run the request with pureAws. -}
S3.GetObjectResponse { S3.gorResponse = rsp, S3.gorMetadata = mdata } <-
Aws.pureAws (_aws_cfg cfg) (_aws_s3cfg cfg) (_aws_httpmgr cfg) $
S3.getObject bucket key
{- Stream the response to a lazy bytestring -}
liftIO $ LBS.writeFile "testaws" LBS.empty -- this will be replaced by content-length of the bytes
let obj = (($$+- CL.mapM_ S.yield) . hoist lift ) (SP.responseBody rsp)
S.mapM_ (liftIO . (LBS.appendFile "testaws") . LBS.fromStrict) obj
return $ lookup "content-length" (S3.omUserMetadata mdata))
case req of
Left _ -> return 2 -- perhaps, we could use this to send an error message over websocket
Right _ -> return 0
让我困惑的是如何确定流的终止?对于文件,这由 writeFile
API 处理。 sendBinaryData
呢?它是否以与 writeFile
类似的方式处理终止?还是由客户端的数据解析器决定?
更新
这个问题是关于如何将数据流式传输到 websocket 句柄(让我们假设已经提供了句柄),就像我们在上面的示例中处理文件句柄一样,而不是真正关于如何在 resourceT
。 conduit
似乎确实采用 mapM_
方法来接收数据。所以,看来这确实是要走的路。
termination问题是因为我有这样的思路:如果我们有一个函数监听Websocket句柄另一端的数据,然后确定消息结束似乎在流媒体环境中很重要。给出如下函数:
f :: LBS.ByteString -> a
如果我们 S.mapM_
将数据流式传输到 websocket 句柄,它是否负责添加某种 end of stream
标记以便 f
在另一端监听可以停止处理惰性字节串。否则 f
将不知道消息何时完成。
你认为手柄需要额外的技巧是正确的。但是,由于您已经在使用 ResourceT
monad 转换器,因此这是 delightfully simple to do with allocate
。 allocate
允许您在资源 monad 中创建一个句柄并注册一个清理操作(在您的情况下只是关闭连接)。
ok <- runResourceT $ do
(releaseKey, handle) <-
allocate (WebSockets.acceptRequest request)
(`WebSockets.sendClose` closeMessage)
WebSockets.sendBinaryData handle data
return ok
where
request = ...
closeMessage = ...
data = ...
ok = ...
通过使用 allocate
,句柄保证在 runResourceT
return 秒 ok
.
不过,我不完全确定这就是您想要的。在我看来 getObject
应该不知道如何接受和关闭 WS 连接;也许它应该将 WS 连接句柄作为参数然后写入它。如果您将其 return 类型升级为 ResourceT
那么您可以将 caller 收费为 getObject
并负责调用 runResourceT
和分配WS 句柄等等。但希望上面的示例足以让您继续前进。
(警告 - 代码未经测试。)
您的代码会重新打开输出文件并在每次传入数据包时附加到它。显然,更好的解决方案是使用 LBS.hPutStr
使用已打开的文件句柄写入文件.
也就是说,而不是:
S.mapM_ (liftIO . (LBS.appendFile "testaws") . LBS.fromStrict) obj
您想使用:
S.mapM_ (liftIO . (LBS.hPutStr h) . LBS.fromStrict) obj
当然,这引用了句柄 h
,那是从哪里来的?
一个解决方案是将它传递给 getObject
或者在调用 getObject
的主体之前创建它,例如:
getObject cfg bucket key = withFile "output" $ \h -> do
req <- ...
...
S.mapM_ (liftIO . (LBS.hPutStr h) . LBS.fromStrict) obj
...
或者您可能必须在 runResourceT 中创建...我不确定。
更新 - 请参阅@haoformayor 的回答,了解如何让 ResourceT 为您管理文件句柄。
这里有一些细节可以使事情更容易理解。首先,对于第一个小演示,修改你的 getObject
,我使用 Streaming.ByteString.writeFile
,无论如何都在 ResourceT
中,以减少惰性字节串的绕路。
{-# LANGUAGE OverloadedStrings,ScopedTypeVariables #-}
import qualified Aws
import qualified Aws.S3 as S3
import Data.Conduit
import qualified Data.Conduit.List as CL (mapM_)
import qualified Data.ByteString.Streaming.HTTP as HTTP
import qualified Data.ByteString.Streaming as SB
import qualified Data.ByteString.Streaming.Internal as SB
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL
import Streaming as S
import Streaming.Prelude as S hiding (show,print)
import Control.Concurrent.Async (async,waitCatch)
import Data.Text as T (Text)
import qualified Network.WebSockets as WebSockets
import Control.Monad.Trans.Resource
data AwsConfig a = AwsConfig { _aws_cfg :: Aws.Configuration
, _aws_s3cfg :: S3.S3Configuration a
, _aws_httpmgr :: HTTP.Manager }
getObject :: AwsConfig Aws.NormalQuery -> FilePath -> T.Text -> T.Text -> IO Int
getObject cfg file bucket key = do
req <- waitCatch =<< async (runResourceT $ do
S3.GetObjectResponse { S3.gorResponse = rsp, S3.gorMetadata = mdata } <-
Aws.pureAws (_aws_cfg cfg) (_aws_s3cfg cfg) (_aws_httpmgr cfg) $
S3.getObject bucket key
let bytestream = do
-- lookup "content-length" (S3.omUserMetadata mdata))
SB.chunk B.empty -- this will be replaced by content-length
hoist lift (HTTP.responseBody rsp) $$+- CL.mapM_ SB.chunk
SB.writeFile file bytestream ) -- this is in ResourceT
case req of
Left _ -> return 2
Right _ -> return 0
我们可以或多或少地从中抽象出你用 SB.writeFile
:
getObjectAbstracted
:: (SB.ByteString (ResourceT IO) () -> ResourceT IO b)
-> AwsConfig Aws.NormalQuery -> S3.Bucket -> Text -> ResourceT IO b
getObjectAbstracted action cfg bucket key = do
S3.GetObjectResponse { S3.gorResponse = rsp, S3.gorMetadata = mdata } <-
Aws.pureAws (_aws_cfg cfg)
(_aws_s3cfg cfg)
(_aws_httpmgr cfg)
(S3.getObject bucket key)
action (hoist lift (HTTP.responseBody rsp) $$+- CL.mapM_ SB.chunk)
现在,我们需要一个流字节串库中没有的小帮手
mapMChunks_ :: Monad m => (B.ByteString -> m ()) -> SB.ByteString m r -> m r
mapMChunks_ act bytestream = do
(a S.:> r) <- SB.foldlChunksM (\_ bs -> act bs) (return ()) bytestream
return r
并且可以或多或少地按照@haoformayor 的计划进行,使用流字节串
writeConnection :: MonadIO m => WebSockets.Connection -> SB.ByteString m r -> m r
writeConnection connection =
mapMChunks_ (liftIO . WebSockets.sendBinaryData connection)
-- following `haoformayor`
connectWrite
:: (MonadResource m, WebSockets.WebSocketsData a)
=> WebSockets.PendingConnection
-> a -- closing message
-> SB.ByteString m r -- stream from aws
-> m r
connectWrite request closeMessage bytestream = do
(releaseKey, connection) <- allocate (WebSockets.acceptRequest request)
(`WebSockets.sendClose` closeMessage)
writeConnection connection bytestream
getObjectWS :: WebSockets.WebSocketsData a =>
WebSockets.PendingConnection
-> a
-> AwsConfig Aws.NormalQuery
-> S3.Bucket
-> Text
-> ResourceT IO ()
getObjectWS request closeMessage = getObjectAbstracted (connectWrite request closeMessage)
当然 none 到此为止利用了 conduit
和 streaming
/streaming-bytestring
.