解码 JSON 流,其中某些值先于其他值

Decoding JSON stream where some values are needed before others

假设我们有一个像这样的 JSON 对象(使用 base64 编码的字节串):

TaggedImage = TaggedImage {  id :: Text, image :: ByteString }

现在,我们要从源接收 image,并使用 id 标签中的信息将其存储在某个位置。因此,这意味着 id 必须提前解析(以确定图像的位置),而 image 以流方式解析。这是直截了当的做法吗?

我计划使用 pipes-aesonaws(用于 S3 存储)和 pipesWebsocket 生产者使用 S3 桶作为消费者(在我们解析 id 以确定 S3 桶的位置之前无法创建)。查看 decoded 方法,我无法弄清楚我是否确实可以按照上面的要求进行操作。这是我第一次尝试在 JSON 和管道中进行流式传输。因此,我们将不胜感激。

读取和写入文件系统的简单示例也将作为 Websocket producerS3 consumer 的替代。


由于 JSON 键值对根据 RFC 无序,而数组是有序的,因此 image 数据可能先于 id,看来,对于我在上面定义的数据类型。因此,将其更改为 JSON 数组(Haskell 中的一个元组,aeson TH 推导似乎转换为有序数组)也可能有所帮助。如果需要,请随时更改数据类型定义,以强加解码顺序。例如,数据类型可能会更改为:

TaggedImage = TaggedImage (Text,ByteString)

我相信您将无法重用 pipes-aeson 库,因为它不提供流式传输已解码 JSON 记录的嵌套字段的方法,也不提供任何支持用于结构的类似光标的导航。这意味着您需要手动解析 JSON 记录的骨架。

另外,需要做一些工作来将 base64-bytestring 包装成 pipes-like API 这种类型:

-- Convert a base64-encoded stream to a raw byte stream
    :: Producer ByteString m r
    -- ^ Base64-encoded bytes
    -> Producer ByteString m (Either SomeException (Producer ByteString m r)) 
    -- ^ Raw bytes

请注意,如果解码成功完成,结果 return 是字节字符串剩余部分(即 base64 编码字节之后的所有内容)的 Producer。这使您可以在图像字节结束的地方继续解析。

但是,假设您有一个 decodeBase64 函数,那么代码的工作方式的粗略轮廓是您将分为三个部分:

  • 使用适合pipes
  • binary解析器解析图像字节之前的记录前缀
  • 使用decodeBase64函数流式传输解码图像字节
  • 也使用适合pipes
  • binary解析器解析图像字节后记录的后缀


-- This would match the "{ 'id' : 'foo', 'image' : '" prefix of the JSON record
skipPrefix :: Data.Binary.Get ()

skipPrefix’ :: Monad m => Producer ByteString m r -> m (Either DecodingError (Producer ByteString m r))
skipPrefix’ = execStateT (Pipes.Binary.decodeGet skipPrefix)

— This would match the "' }" suffix of the JSON record
skipSuffix :: Data.Binary.Get ()

skipSuffix’ :: Monad m => Producer ByteString m r -> m (Either DecodingError (Producer ByteString m r))
skipSuffix’ = execStateT (Pipes.Binary.decodeGet skipSuffix)

    ::  Monad m
    =>  Producer ByteString m r
    ->  Producer ByteString m (Either SomeException (Producer ByteString m r))
streamImage p0 = do
    e0 <- lift (skipPrefix’ p0)
    case e0 of
        Left exc -> return (Left (toException exc))
        Right p1 -> do
            e1 <- decodeBase64 p1
            case e1 of
                Left exc -> return (Left exc)
                Right p2 -> do
                    e2 <- lift (skipSuffix’ p2)
                    case e2 of
                        Left exc -> return (Left (toException exc))
                        Right p3 -> return (Right p3)

换句话说,streamImage 会将 Producer 作为从 JSON 记录的第一个字符开始的输入,并且它将流式传输从中提取的解码图像字节记录。如果解码成功,那么它将 return 字节流的剩余部分紧跟在 JSON 记录之后。