ksql - 从 json 数组创建流
ksql - creating a stream from a json array
我的kafka主题正在推送这种格式的数据(来自collectd):
[{"values":[100.000080140372],"dstypes":["derive"],"dsnames":["value"],"time":1529970061.145,"interval":10.000,"host":"k5.orch","plugin":"cpu","plugin_instance":"23","type":"cpu","type_instance":"idle","meta":{"network:received":true}}]
它是数组、整数和浮点数的组合...并且整个内容都在一个 json 数组中。结果,我有一段时间使用 ksql 来处理这些数据。
当我将 'default' 流创建为
create stream cd_temp with (kafka_topic='ctd_test', value_format='json');
我得到这个结果:
ksql> describe cd_temp;
Field | Type
-------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
-------------------------------------
任何 select 都会 return ROWTIME 和 ROWKEY 的 8 位十六进制值。
我花了一些时间尝试提取 json 字段但无济于事。我关心的是:
ksql> print 'ctd_test' from beginning;
Format:JSON
com.fasterxml.jackson.databind.node.ArrayNode cannot be cast to com.fasterxml.jackson.databind.node.ObjectNode
难不成这个话题不能用在ksql中?有没有一种技术可以解包外部数组以获取内部有趣的位?
在撰写本文时(2018 年 6 月),KSQL 无法处理 JSON 消息,其中整个消息都嵌入到顶级数组中。有一个github issue to track this。我建议在这个问题上添加 +1 投票以提高它的优先级。
此外,我注意到您的创建流语句没有定义 json 消息的架构。虽然这在这种情况下无济于事,但对于其他 Json 输入格式,您需要这样做,即您创建的语句应该类似于:
create stream cd_temp (values ARRAY<DOUBLE>, dstypes ARRAY<VARCHAR>, etc) with (kafka_topic='ctd_test', value_format='json');
我的kafka主题正在推送这种格式的数据(来自collectd):
[{"values":[100.000080140372],"dstypes":["derive"],"dsnames":["value"],"time":1529970061.145,"interval":10.000,"host":"k5.orch","plugin":"cpu","plugin_instance":"23","type":"cpu","type_instance":"idle","meta":{"network:received":true}}]
它是数组、整数和浮点数的组合...并且整个内容都在一个 json 数组中。结果,我有一段时间使用 ksql 来处理这些数据。
当我将 'default' 流创建为
create stream cd_temp with (kafka_topic='ctd_test', value_format='json');
我得到这个结果:
ksql> describe cd_temp;
Field | Type
-------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
-------------------------------------
任何 select 都会 return ROWTIME 和 ROWKEY 的 8 位十六进制值。
我花了一些时间尝试提取 json 字段但无济于事。我关心的是:
ksql> print 'ctd_test' from beginning;
Format:JSON
com.fasterxml.jackson.databind.node.ArrayNode cannot be cast to com.fasterxml.jackson.databind.node.ObjectNode
难不成这个话题不能用在ksql中?有没有一种技术可以解包外部数组以获取内部有趣的位?
在撰写本文时(2018 年 6 月),KSQL 无法处理 JSON 消息,其中整个消息都嵌入到顶级数组中。有一个github issue to track this。我建议在这个问题上添加 +1 投票以提高它的优先级。
此外,我注意到您的创建流语句没有定义 json 消息的架构。虽然这在这种情况下无济于事,但对于其他 Json 输入格式,您需要这样做,即您创建的语句应该类似于:
create stream cd_temp (values ARRAY<DOUBLE>, dstypes ARRAY<VARCHAR>, etc) with (kafka_topic='ctd_test', value_format='json');