解码 JSON 流,其中某些值先于其他值

Decoding JSON stream where some values are needed before others

假设我们有一个像这样的 JSON 对象(使用 base64 编码的字节串):

TaggedImage = TaggedImage {  id :: Text, image :: ByteString }

现在,我们要从源接收 image,并使用 id 标签中的信息将其存储在某个位置。因此,这意味着 id 必须提前解析(以确定图像的位置),而 image 以流方式解析。这是直截了当的做法吗?

我计划使用 pipes-aesonaws(用于 S3 存储)和 pipesWebsocket 生产者使用 S3 桶作为消费者(在我们解析 id 以确定 S3 桶的位置之前无法创建)。查看 decoded 方法,我无法弄清楚我是否确实可以按照上面的要求进行操作。这是我第一次尝试在 JSON 和管道中进行流式传输。因此,我们将不胜感激。

读取和写入文件系统的简单示例也将作为 Websocket producerS3 consumer 的替代。

附录

由于 JSON 键值对根据 RFC 无序,而数组是有序的,因此 image 数据可能先于 id,看来,对于我在上面定义的数据类型。因此,将其更改为 JSON 数组(Haskell 中的一个元组,aeson TH 推导似乎转换为有序数组)也可能有所帮助。如果需要,请随时更改数据类型定义,以强加解码顺序。例如,数据类型可能会更改为:

TaggedImage = TaggedImage (Text,ByteString)

我相信您将无法重用 pipes-aeson 库,因为它不提供流式传输已解码 JSON 记录的嵌套字段的方法,也不提供任何支持用于结构的类似光标的导航。这意味着您需要手动解析 JSON 记录的骨架。

另外,需要做一些工作来将 base64-bytestring 包装成 pipes-like API 这种类型:

-- Convert a base64-encoded stream to a raw byte stream
decodeBase64
    :: Producer ByteString m r
    -- ^ Base64-encoded bytes
    -> Producer ByteString m (Either SomeException (Producer ByteString m r)) 
    -- ^ Raw bytes

请注意,如果解码成功完成,结果 return 是字节字符串剩余部分(即 base64 编码字节之后的所有内容)的 Producer。这使您可以在图像字节结束的地方继续解析。

但是,假设您有一个 decodeBase64 函数,那么代码的工作方式的粗略轮廓是您将分为三个部分:

  • 使用适合pipes
  • binary解析器解析图像字节之前的记录前缀
  • 使用decodeBase64函数流式传输解码图像字节
  • 也使用适合pipes
  • binary解析器解析图像字节后记录的后缀

换句话说,类型和实现大致如下所示:

-- This would match the "{ 'id' : 'foo', 'image' : '" prefix of the JSON record
skipPrefix :: Data.Binary.Get ()

skipPrefix’ :: Monad m => Producer ByteString m r -> m (Either DecodingError (Producer ByteString m r))
skipPrefix’ = execStateT (Pipes.Binary.decodeGet skipPrefix)

— This would match the "' }" suffix of the JSON record
skipSuffix :: Data.Binary.Get ()

skipSuffix’ :: Monad m => Producer ByteString m r -> m (Either DecodingError (Producer ByteString m r))
skipSuffix’ = execStateT (Pipes.Binary.decodeGet skipSuffix)

streamImage
    ::  Monad m
    =>  Producer ByteString m r
    ->  Producer ByteString m (Either SomeException (Producer ByteString m r))
streamImage p0 = do
    e0 <- lift (skipPrefix’ p0)
    case e0 of
        Left exc -> return (Left (toException exc))
        Right p1 -> do
            e1 <- decodeBase64 p1
            case e1 of
                Left exc -> return (Left exc)
                Right p2 -> do
                    e2 <- lift (skipSuffix’ p2)
                    case e2 of
                        Left exc -> return (Left (toException exc))
                        Right p3 -> return (Right p3)

换句话说,streamImage 会将 Producer 作为从 JSON 记录的第一个字符开始的输入,并且它将流式传输从中提取的解码图像字节记录。如果解码成功,那么它将 return 字节流的剩余部分紧跟在 JSON 记录之后。