汇合 KSQL 中的空处理
Null Handling in confluent KSQL
能否请您告诉我如何处理 KSQL 中的 Null 值。我尝试处理 4 种可能的方式,但没有得到解决。尝试使用 4 种方法在 KSQL 中用不同的值替换 NULL 但给出了问题。
ksql> select PORTFOLIO_PLAN_ID from topic_stream_name; null
ksql> select COALESCE(PORTFOLIO_PLAN_ID,'N/A') from topic_stream_name; Can't find any functions with the name 'COALESCE'
ksql> select IFNULL(PORTFOLIO_PLAN_ID,'N/A') from topic_stream_name; Function 'IFNULL' does not accept parameters of types:[BIGINT, VARCHAR(STRING)]
ksql> select if(PORTFOLIO_PLAN_ID IS NOT NULL,PORTFOLIO_PLAN_ID,'N/A') FROM topic_stream_name; Can't find any functions with the name 'IF'
正如@cricket_007 所引用的那样,在KSQL 中有一个open ticket 用于以这种方式处理NULL 值。
您可以使用的一种解决方法是使用 INSERT INTO。它不是很优雅,而且肯定不如 COALESCE
:
这样的函数灵活
# Set up some sample data, run this from bash
# For more info about kafkacat see
# https://docs.confluent.io/current/app-development/kafkacat-usage.html
kafkacat -b kafka-broker:9092 \
-t topic_with_nulls \
-P <<EOF
{"col1":1,"col2":16000,"col3":"foo"}
{"col1":2,"col2":42000}
{"col1":3,"col2":94000,"col3":"bar"}
{"col1":4,"col2":12345}
EOF
这是在 col3
中处理 NULL 的 KSQL 解决方法:
-- Register the topic
CREATE STREAM topic_with_nulls (COL1 INT, COL2 INT, COL3 VARCHAR) \
WITH (KAFKA_TOPIC='topic_with_nulls',VALUE_FORMAT='JSON');
-- Query the topic to show there are some null values
ksql> SET 'auto.offset.reset'='earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql> SELECT COL1, COL2, COL3 FROM topic_with_nulls;
1 | 16000 | foo
2 | 42000 | null
3 | 94000 | bar
4 | 12345 | null
-- Create a derived stream, with just records with no NULLs in COL3
CREATE STREAM NULL_WORKAROUND AS \
SELECT COL1, COL2, COL3 FROM topic_with_nulls WHERE COL3 IS NOT NULL;
-- Insert into the derived stream any records where COL3 *is* NULL, replacing it with a fixed string
INSERT INTO NULL_WORKAROUND \
SELECT COL1, COL2, 'N/A' AS COL3 FROM topic_with_nulls WHERE COL3 IS NULL;
-- Confirm that the NULL substitution worked
ksql> SELECT COL1, COL2, COL3 FROM NULL_WORKAROUND;
1 | 16000 | foo
2 | 42000 | N/A
3 | 94000 | bar
4 | 12345 | N/A
能否请您告诉我如何处理 KSQL 中的 Null 值。我尝试处理 4 种可能的方式,但没有得到解决。尝试使用 4 种方法在 KSQL 中用不同的值替换 NULL 但给出了问题。
ksql> select PORTFOLIO_PLAN_ID from topic_stream_name; null
ksql> select COALESCE(PORTFOLIO_PLAN_ID,'N/A') from topic_stream_name; Can't find any functions with the name 'COALESCE'
ksql> select IFNULL(PORTFOLIO_PLAN_ID,'N/A') from topic_stream_name; Function 'IFNULL' does not accept parameters of types:[BIGINT, VARCHAR(STRING)]
ksql> select if(PORTFOLIO_PLAN_ID IS NOT NULL,PORTFOLIO_PLAN_ID,'N/A') FROM topic_stream_name; Can't find any functions with the name 'IF'
正如@cricket_007 所引用的那样,在KSQL 中有一个open ticket 用于以这种方式处理NULL 值。
您可以使用的一种解决方法是使用 INSERT INTO。它不是很优雅,而且肯定不如 COALESCE
:
# Set up some sample data, run this from bash
# For more info about kafkacat see
# https://docs.confluent.io/current/app-development/kafkacat-usage.html
kafkacat -b kafka-broker:9092 \
-t topic_with_nulls \
-P <<EOF
{"col1":1,"col2":16000,"col3":"foo"}
{"col1":2,"col2":42000}
{"col1":3,"col2":94000,"col3":"bar"}
{"col1":4,"col2":12345}
EOF
这是在 col3
中处理 NULL 的 KSQL 解决方法:
-- Register the topic
CREATE STREAM topic_with_nulls (COL1 INT, COL2 INT, COL3 VARCHAR) \
WITH (KAFKA_TOPIC='topic_with_nulls',VALUE_FORMAT='JSON');
-- Query the topic to show there are some null values
ksql> SET 'auto.offset.reset'='earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql> SELECT COL1, COL2, COL3 FROM topic_with_nulls;
1 | 16000 | foo
2 | 42000 | null
3 | 94000 | bar
4 | 12345 | null
-- Create a derived stream, with just records with no NULLs in COL3
CREATE STREAM NULL_WORKAROUND AS \
SELECT COL1, COL2, COL3 FROM topic_with_nulls WHERE COL3 IS NOT NULL;
-- Insert into the derived stream any records where COL3 *is* NULL, replacing it with a fixed string
INSERT INTO NULL_WORKAROUND \
SELECT COL1, COL2, 'N/A' AS COL3 FROM topic_with_nulls WHERE COL3 IS NULL;
-- Confirm that the NULL substitution worked
ksql> SELECT COL1, COL2, COL3 FROM NULL_WORKAROUND;
1 | 16000 | foo
2 | 42000 | N/A
3 | 94000 | bar
4 | 12345 | N/A