为什么 KSQLDB 推送查询输出分块 json?

How come KSQLDB push query outputs chunked json?

我是 KsqlDB 的新手,所以我可能遗漏了一些明显的东西。我的问题与 never-ending push-query 的分块 JSON 输出无效 JSON 有关。让我详细说明。

简而言之,我的设置如下。从 typescript/node 过程中,我在 ksql 流上创建了一个推送查询,如下所示:

CREATE STREAM events (id VARCHAR, timestamp VARCHAR, location VARCHAR, events ARRAY<VARCHAR>) WITH (kafka_topic='mytopic', value_format='json', partitions=1);

推送查询本身创建为 long-running REST 流(使用 axios):

const response = await axios.post(
    `http://ksqldb-server:8088/query-stream`,
    {
        sql: `SELECT * FROM events EMIT CHANGES;`,
        streamsProperties: {}
    },
    {
        headers: {
            'Content-Type': 'application/vnd.ksql.v1+json',
            Accept: 'application/vnd.ksql.v1+json',
        },
        responseType: 'stream',
    }
);

这行得通。当运行时,我首先得到header行:

[{"header":{"queryId":"transient_EVENTS_2815830975103425962","schema":"`ID` STRING, `TIMESTAMP` STRING, `LOCATION` STRING, `EVENTS` ARRAY<STRING>"}}

随后根据 real-world 个事件进入 one-by-one 的新行:

{"row":{"columns":["b82baad7-a87e-4617-b18a-1782b4cb49ce","2022-05-16 08:03:03","Home",["EventA","EventD"]]}},\n

现在,如果这个查询能够完成,它可能会在连接在一起时最终成为有效的 JSON(尽管 header 行末尾缺少 ,)。然而,由于它是一个推送查询,它永远不会完成,因此我不会收到结束 ] - 这意味着它永远不会有效 JSON。另外,我希望在 real-time 中处理事件,否则我可以编写一个拉取查询。

我的期望是每个新行都可以使用 JSON.parse() 自行解析。相反,我最终不得不 JSON.parse(data.slice(0, -2)) 摆脱额外的 ,\n。但是,感觉不太适合投产。

在推送查询时输出分块 JSON 背后的原因是什么?对于任何 use-case.

我来说这似乎是一种不合逻辑的格式

有没有办法将 ksql 事件的输出更改为我所期望的?也许我缺少某些 header 或属性?

感谢您的见解!

您在 headers 中将 application/vnd.ksql.v1+json 明确设置为所需的响应格式:

headers: {
            'Content-Type': 'application/vnd.ksql.v1+json',
            Accept: 'application/vnd.ksql.v1+json',
        },

application/vnd.ksql.v1+json 表示完整的回复将是一个有效的 JSON 文档。

正如您所指出的,这是不切实际的,因为推送查询永远不会完成。您应该删除 headers 或将它们明确设置为默认值 application/vnd.ksqlapi.delimited.v1application/vnd.ksqlapi.delimited.v1 意味着每个返回的行都将是有效的 JSON.

有关详细信息,请参阅 https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/streaming-endpoint/#executing-pull-or-push-queries