解码 JSON 流,其中某些值先于其他值
Decoding JSON stream where some values are needed before others
假设我们有一个像这样的 JSON 对象(使用 base64 编码的字节串):
TaggedImage = TaggedImage { id :: Text, image :: ByteString }
现在,我们要从源接收 image
,并使用 id
标签中的信息将其存储在某个位置。因此,这意味着 id
必须提前解析(以确定图像的位置),而 image
以流方式解析。这是直截了当的做法吗?
我计划使用 pipes-aeson
、aws
(用于 S3
存储)和 pipes
从 Websocket
生产者使用 S3
桶作为消费者(在我们解析 id
以确定 S3
桶的位置之前无法创建)。查看 decoded
方法,我无法弄清楚我是否确实可以按照上面的要求进行操作。这是我第一次尝试在 JSON 和管道中进行流式传输。因此,我们将不胜感激。
读取和写入文件系统的简单示例也将作为 Websocket producer
和 S3 consumer
的替代。
附录
由于 JSON 键值对根据 RFC 无序,而数组是有序的,因此 image
数据可能先于 id
,看来,对于我在上面定义的数据类型。因此,将其更改为 JSON 数组(Haskell 中的一个元组,aeson
TH 推导似乎转换为有序数组)也可能有所帮助。如果需要,请随时更改数据类型定义,以强加解码顺序。例如,数据类型可能会更改为:
TaggedImage = TaggedImage (Text,ByteString)
我相信您将无法重用 pipes-aeson
库,因为它不提供流式传输已解码 JSON 记录的嵌套字段的方法,也不提供任何支持用于结构的类似光标的导航。这意味着您需要手动解析 JSON 记录的骨架。
另外,需要做一些工作来将 base64-bytestring
包装成 pipes
-like API 这种类型:
-- Convert a base64-encoded stream to a raw byte stream
decodeBase64
:: Producer ByteString m r
-- ^ Base64-encoded bytes
-> Producer ByteString m (Either SomeException (Producer ByteString m r))
-- ^ Raw bytes
请注意,如果解码成功完成,结果 return 是字节字符串剩余部分(即 base64 编码字节之后的所有内容)的 Producer
。这使您可以在图像字节结束的地方继续解析。
但是,假设您有一个 decodeBase64
函数,那么代码的工作方式的粗略轮廓是您将分为三个部分:
- 使用适合
pipes
的binary
解析器解析图像字节之前的记录前缀
- 使用
decodeBase64
函数流式传输解码图像字节
- 也使用适合
pipes
的binary
解析器解析图像字节后记录的后缀
换句话说,类型和实现大致如下所示:
-- This would match the "{ 'id' : 'foo', 'image' : '" prefix of the JSON record
skipPrefix :: Data.Binary.Get ()
skipPrefix’ :: Monad m => Producer ByteString m r -> m (Either DecodingError (Producer ByteString m r))
skipPrefix’ = execStateT (Pipes.Binary.decodeGet skipPrefix)
— This would match the "' }" suffix of the JSON record
skipSuffix :: Data.Binary.Get ()
skipSuffix’ :: Monad m => Producer ByteString m r -> m (Either DecodingError (Producer ByteString m r))
skipSuffix’ = execStateT (Pipes.Binary.decodeGet skipSuffix)
streamImage
:: Monad m
=> Producer ByteString m r
-> Producer ByteString m (Either SomeException (Producer ByteString m r))
streamImage p0 = do
e0 <- lift (skipPrefix’ p0)
case e0 of
Left exc -> return (Left (toException exc))
Right p1 -> do
e1 <- decodeBase64 p1
case e1 of
Left exc -> return (Left exc)
Right p2 -> do
e2 <- lift (skipSuffix’ p2)
case e2 of
Left exc -> return (Left (toException exc))
Right p3 -> return (Right p3)
换句话说,streamImage
会将 Producer
作为从 JSON 记录的第一个字符开始的输入,并且它将流式传输从中提取的解码图像字节记录。如果解码成功,那么它将 return 字节流的剩余部分紧跟在 JSON 记录之后。
假设我们有一个像这样的 JSON 对象(使用 base64 编码的字节串):
TaggedImage = TaggedImage { id :: Text, image :: ByteString }
现在,我们要从源接收 image
,并使用 id
标签中的信息将其存储在某个位置。因此,这意味着 id
必须提前解析(以确定图像的位置),而 image
以流方式解析。这是直截了当的做法吗?
我计划使用 pipes-aeson
、aws
(用于 S3
存储)和 pipes
从 Websocket
生产者使用 S3
桶作为消费者(在我们解析 id
以确定 S3
桶的位置之前无法创建)。查看 decoded
方法,我无法弄清楚我是否确实可以按照上面的要求进行操作。这是我第一次尝试在 JSON 和管道中进行流式传输。因此,我们将不胜感激。
读取和写入文件系统的简单示例也将作为 Websocket producer
和 S3 consumer
的替代。
附录
由于 JSON 键值对根据 RFC 无序,而数组是有序的,因此 image
数据可能先于 id
,看来,对于我在上面定义的数据类型。因此,将其更改为 JSON 数组(Haskell 中的一个元组,aeson
TH 推导似乎转换为有序数组)也可能有所帮助。如果需要,请随时更改数据类型定义,以强加解码顺序。例如,数据类型可能会更改为:
TaggedImage = TaggedImage (Text,ByteString)
我相信您将无法重用 pipes-aeson
库,因为它不提供流式传输已解码 JSON 记录的嵌套字段的方法,也不提供任何支持用于结构的类似光标的导航。这意味着您需要手动解析 JSON 记录的骨架。
另外,需要做一些工作来将 base64-bytestring
包装成 pipes
-like API 这种类型:
-- Convert a base64-encoded stream to a raw byte stream
decodeBase64
:: Producer ByteString m r
-- ^ Base64-encoded bytes
-> Producer ByteString m (Either SomeException (Producer ByteString m r))
-- ^ Raw bytes
请注意,如果解码成功完成,结果 return 是字节字符串剩余部分(即 base64 编码字节之后的所有内容)的 Producer
。这使您可以在图像字节结束的地方继续解析。
但是,假设您有一个 decodeBase64
函数,那么代码的工作方式的粗略轮廓是您将分为三个部分:
- 使用适合
pipes
的 - 使用
decodeBase64
函数流式传输解码图像字节 - 也使用适合
pipes
的
binary
解析器解析图像字节之前的记录前缀
binary
解析器解析图像字节后记录的后缀
换句话说,类型和实现大致如下所示:
-- This would match the "{ 'id' : 'foo', 'image' : '" prefix of the JSON record
skipPrefix :: Data.Binary.Get ()
skipPrefix’ :: Monad m => Producer ByteString m r -> m (Either DecodingError (Producer ByteString m r))
skipPrefix’ = execStateT (Pipes.Binary.decodeGet skipPrefix)
— This would match the "' }" suffix of the JSON record
skipSuffix :: Data.Binary.Get ()
skipSuffix’ :: Monad m => Producer ByteString m r -> m (Either DecodingError (Producer ByteString m r))
skipSuffix’ = execStateT (Pipes.Binary.decodeGet skipSuffix)
streamImage
:: Monad m
=> Producer ByteString m r
-> Producer ByteString m (Either SomeException (Producer ByteString m r))
streamImage p0 = do
e0 <- lift (skipPrefix’ p0)
case e0 of
Left exc -> return (Left (toException exc))
Right p1 -> do
e1 <- decodeBase64 p1
case e1 of
Left exc -> return (Left exc)
Right p2 -> do
e2 <- lift (skipSuffix’ p2)
case e2 of
Left exc -> return (Left (toException exc))
Right p3 -> return (Right p3)
换句话说,streamImage
会将 Producer
作为从 JSON 记录的第一个字符开始的输入,并且它将流式传输从中提取的解码图像字节记录。如果解码成功,那么它将 return 字节流的剩余部分紧跟在 JSON 记录之后。