来自 KAFKA JSON 的带有数组的 PipelineDB 消费者
PipelineDB consumer from KAFKA JSON with array
我的kafka发送如下json
'{
"eventSummaryList": [
{
"customer": 1,
"data": "{\"cliente\":\"52264\",\"data_posicao\":\"1484250682\",\"gps_valido\":\"1\",\"horimetro\":\"0\",\"ibuttonPart1\":\"0\",\"ibuttonPart2\":\"0\",\"id_evento\":\"null\",\"id_motorista\":\"0\",\"ignicao\":\"0\",\"latitude\":\"-25.5385123\",\"longitude\":\"-49.1995068\",\"odometro\":\"0\",\"pos_memoria\":\"0\",\"veiculo\":\"103970\",\"velocidade\":\"0\"}",
"identifierRule": 1770,
"identifierSummary": 17,
"rule": "rota_fora",
"status": 1,
"vehicle": 103970
},
{
"customer": 2,
"data": "{\"cliente\":\"52264\",\"data_posicao\":\"1484250682\",\"gps_valido\":\"1\",\"horimetro\":\"0\",\"ibuttonPart1\":\"0\",\"ibuttonPart2\":\"0\",\"id_evento\":\"null\",\"id_motorista\":\"0\",\"ignicao\":\"0\",\"latitude\":\"-25.5385123\",\"longitude\":\"-49.1995068\",\"odometro\":\"0\",\"pos_memoria\":\"0\",\"veiculo\":\"103970\",\"velocidade\":\"0\"}",
"identifierRule": 8,
"identifierSummary": 7,
"rule": "velocidade_maior",
"status": 1,
"vehicle": 103970
}
]
}'
我创建了这个连续变换
CREATE CONTINUOUS TRANSFORM sensor_event_process_transform AS
SELECT cast ( cast(pack ->>'eventSummaryList' as json)->>'customer' as bigint ) as customer
FROM pipeline_kafka.sensor_event_process_stream
然后执行程序 update_sensor_event_process_t();
但是我的日志 pipelineDB return 这个...
上下文:JSON 数据,第 1 行:{
COPY sensor_event_process_stream,第 1 行,列包:“{”
日志:[pipeline_kafka] sensor_event_process_stream <-topicNotificationProcess (PID 25201):无法处理批次,丢弃了 8 条消息
错误:类型 json 的输入语法无效
详细信息:输入字符串意外结束。
如何遍历 json 数组并仅获取客户列的内容?
嗨,亲爱的,我解决了我的问题,我使用了函数 json_array_elements,保持这样...
CREATE CONTINUOUS TRANSFORM sensor_event_process_transform AS
SELECT Cast(value::json ->> 'identifierRule' AS BIGINT) AS id_regra,
Cast(value::json ->> 'rule' AS VARCHAR) AS regra,
Cast((value::json ->'data')::json->> 'veiculo' AS BIGINT) AS id_veiculo,
Cast(value::json ->> 'customer' AS bigint) AS id_cliente ,
cast((value::json ->'data')::json->> 'velocidade' AS int) AS velocidade,
cast((value::json ->'data')::json->> 'odometro' AS int) AS odometro,
cast((value::json ->'data')::json->> 'data_posicao' AS bigint) AS data_posicao,
cast((value::json ->'data')::json->> 'id_motorista' AS bigint) AS id_motorista,
cast((value::json ->'data')::json->> 'latitude' AS float) AS latitude,
cast((value::json ->'data')::json->> 'longitude' AS float) AS longitude,
cast(value::json ->> 'status' AS boolean) AS status
FROM (
SELECT (json_array_elements(pack->'eventSummaryList'))::json AS value
FROM pipeline_kafka.sensor_event_process_stream ) b
THEN EXECUTE procedure update_sensor_event_process_t();
一个重要的细节,不要传递带有空格和换行符的json,管道不接受。
我的kafka发送如下json
'{
"eventSummaryList": [
{
"customer": 1,
"data": "{\"cliente\":\"52264\",\"data_posicao\":\"1484250682\",\"gps_valido\":\"1\",\"horimetro\":\"0\",\"ibuttonPart1\":\"0\",\"ibuttonPart2\":\"0\",\"id_evento\":\"null\",\"id_motorista\":\"0\",\"ignicao\":\"0\",\"latitude\":\"-25.5385123\",\"longitude\":\"-49.1995068\",\"odometro\":\"0\",\"pos_memoria\":\"0\",\"veiculo\":\"103970\",\"velocidade\":\"0\"}",
"identifierRule": 1770,
"identifierSummary": 17,
"rule": "rota_fora",
"status": 1,
"vehicle": 103970
},
{
"customer": 2,
"data": "{\"cliente\":\"52264\",\"data_posicao\":\"1484250682\",\"gps_valido\":\"1\",\"horimetro\":\"0\",\"ibuttonPart1\":\"0\",\"ibuttonPart2\":\"0\",\"id_evento\":\"null\",\"id_motorista\":\"0\",\"ignicao\":\"0\",\"latitude\":\"-25.5385123\",\"longitude\":\"-49.1995068\",\"odometro\":\"0\",\"pos_memoria\":\"0\",\"veiculo\":\"103970\",\"velocidade\":\"0\"}",
"identifierRule": 8,
"identifierSummary": 7,
"rule": "velocidade_maior",
"status": 1,
"vehicle": 103970
}
]
}'
我创建了这个连续变换
CREATE CONTINUOUS TRANSFORM sensor_event_process_transform AS
SELECT cast ( cast(pack ->>'eventSummaryList' as json)->>'customer' as bigint ) as customer
FROM pipeline_kafka.sensor_event_process_stream
然后执行程序 update_sensor_event_process_t();
但是我的日志 pipelineDB return 这个...
上下文:JSON 数据,第 1 行:{ COPY sensor_event_process_stream,第 1 行,列包:“{” 日志:[pipeline_kafka] sensor_event_process_stream <-topicNotificationProcess (PID 25201):无法处理批次,丢弃了 8 条消息 错误:类型 json 的输入语法无效 详细信息:输入字符串意外结束。
如何遍历 json 数组并仅获取客户列的内容?
嗨,亲爱的,我解决了我的问题,我使用了函数 json_array_elements,保持这样...
CREATE CONTINUOUS TRANSFORM sensor_event_process_transform AS
SELECT Cast(value::json ->> 'identifierRule' AS BIGINT) AS id_regra,
Cast(value::json ->> 'rule' AS VARCHAR) AS regra,
Cast((value::json ->'data')::json->> 'veiculo' AS BIGINT) AS id_veiculo,
Cast(value::json ->> 'customer' AS bigint) AS id_cliente ,
cast((value::json ->'data')::json->> 'velocidade' AS int) AS velocidade,
cast((value::json ->'data')::json->> 'odometro' AS int) AS odometro,
cast((value::json ->'data')::json->> 'data_posicao' AS bigint) AS data_posicao,
cast((value::json ->'data')::json->> 'id_motorista' AS bigint) AS id_motorista,
cast((value::json ->'data')::json->> 'latitude' AS float) AS latitude,
cast((value::json ->'data')::json->> 'longitude' AS float) AS longitude,
cast(value::json ->> 'status' AS boolean) AS status
FROM (
SELECT (json_array_elements(pack->'eventSummaryList'))::json AS value
FROM pipeline_kafka.sensor_event_process_stream ) b
THEN EXECUTE procedure update_sensor_event_process_t();
一个重要的细节,不要传递带有空格和换行符的json,管道不接受。