为什么 KSQLDB 推送查询输出分块 json?
How come KSQLDB push query outputs chunked json?
我是 KsqlDB 的新手,所以我可能遗漏了一些明显的东西。我的问题与 never-ending push-query 的分块 JSON 输出无效 JSON 有关。让我详细说明。
简而言之,我的设置如下。从 typescript/node 过程中,我在 ksql 流上创建了一个推送查询,如下所示:
CREATE STREAM events (id VARCHAR, timestamp VARCHAR, location VARCHAR, events ARRAY<VARCHAR>) WITH (kafka_topic='mytopic', value_format='json', partitions=1);
推送查询本身创建为 long-running REST 流(使用 axios):
const response = await axios.post(
`http://ksqldb-server:8088/query-stream`,
{
sql: `SELECT * FROM events EMIT CHANGES;`,
streamsProperties: {}
},
{
headers: {
'Content-Type': 'application/vnd.ksql.v1+json',
Accept: 'application/vnd.ksql.v1+json',
},
responseType: 'stream',
}
);
这行得通。当运行时,我首先得到header行:
[{"header":{"queryId":"transient_EVENTS_2815830975103425962","schema":"`ID` STRING, `TIMESTAMP` STRING, `LOCATION` STRING, `EVENTS` ARRAY<STRING>"}}
随后根据 real-world 个事件进入 one-by-one 的新行:
{"row":{"columns":["b82baad7-a87e-4617-b18a-1782b4cb49ce","2022-05-16 08:03:03","Home",["EventA","EventD"]]}},\n
现在,如果这个查询能够完成,它可能会在连接在一起时最终成为有效的 JSON(尽管 header 行末尾缺少 ,
)。然而,由于它是一个推送查询,它永远不会完成,因此我不会收到结束 ]
- 这意味着它永远不会有效 JSON。另外,我希望在 real-time 中处理事件,否则我可以编写一个拉取查询。
我的期望是每个新行都可以使用 JSON.parse()
自行解析。相反,我最终不得不 JSON.parse(data.slice(0, -2))
摆脱额外的 ,\n
。但是,感觉不太适合投产。
在推送查询时输出分块 JSON 背后的原因是什么?对于任何 use-case.
我来说这似乎是一种不合逻辑的格式
有没有办法将 ksql 事件的输出更改为我所期望的?也许我缺少某些 header 或属性?
感谢您的见解!
您在 headers 中将 application/vnd.ksql.v1+json
明确设置为所需的响应格式:
headers: {
'Content-Type': 'application/vnd.ksql.v1+json',
Accept: 'application/vnd.ksql.v1+json',
},
application/vnd.ksql.v1+json
表示完整的回复将是一个有效的 JSON 文档。
正如您所指出的,这是不切实际的,因为推送查询永远不会完成。您应该删除 headers 或将它们明确设置为默认值 application/vnd.ksqlapi.delimited.v1
。 application/vnd.ksqlapi.delimited.v1
意味着每个返回的行都将是有效的 JSON.
我是 KsqlDB 的新手,所以我可能遗漏了一些明显的东西。我的问题与 never-ending push-query 的分块 JSON 输出无效 JSON 有关。让我详细说明。
简而言之,我的设置如下。从 typescript/node 过程中,我在 ksql 流上创建了一个推送查询,如下所示:
CREATE STREAM events (id VARCHAR, timestamp VARCHAR, location VARCHAR, events ARRAY<VARCHAR>) WITH (kafka_topic='mytopic', value_format='json', partitions=1);
推送查询本身创建为 long-running REST 流(使用 axios):
const response = await axios.post(
`http://ksqldb-server:8088/query-stream`,
{
sql: `SELECT * FROM events EMIT CHANGES;`,
streamsProperties: {}
},
{
headers: {
'Content-Type': 'application/vnd.ksql.v1+json',
Accept: 'application/vnd.ksql.v1+json',
},
responseType: 'stream',
}
);
这行得通。当运行时,我首先得到header行:
[{"header":{"queryId":"transient_EVENTS_2815830975103425962","schema":"`ID` STRING, `TIMESTAMP` STRING, `LOCATION` STRING, `EVENTS` ARRAY<STRING>"}}
随后根据 real-world 个事件进入 one-by-one 的新行:
{"row":{"columns":["b82baad7-a87e-4617-b18a-1782b4cb49ce","2022-05-16 08:03:03","Home",["EventA","EventD"]]}},\n
现在,如果这个查询能够完成,它可能会在连接在一起时最终成为有效的 JSON(尽管 header 行末尾缺少 ,
)。然而,由于它是一个推送查询,它永远不会完成,因此我不会收到结束 ]
- 这意味着它永远不会有效 JSON。另外,我希望在 real-time 中处理事件,否则我可以编写一个拉取查询。
我的期望是每个新行都可以使用 JSON.parse()
自行解析。相反,我最终不得不 JSON.parse(data.slice(0, -2))
摆脱额外的 ,\n
。但是,感觉不太适合投产。
在推送查询时输出分块 JSON 背后的原因是什么?对于任何 use-case.
我来说这似乎是一种不合逻辑的格式有没有办法将 ksql 事件的输出更改为我所期望的?也许我缺少某些 header 或属性?
感谢您的见解!
您在 headers 中将 application/vnd.ksql.v1+json
明确设置为所需的响应格式:
headers: {
'Content-Type': 'application/vnd.ksql.v1+json',
Accept: 'application/vnd.ksql.v1+json',
},
application/vnd.ksql.v1+json
表示完整的回复将是一个有效的 JSON 文档。
正如您所指出的,这是不切实际的,因为推送查询永远不会完成。您应该删除 headers 或将它们明确设置为默认值 application/vnd.ksqlapi.delimited.v1
。 application/vnd.ksqlapi.delimited.v1
意味着每个返回的行都将是有效的 JSON.