KSQL 流 - EXPLODE 空问题
KSQL streams - EXPLODE null issue
我正在研究与赞比亚的疟疾药物可用性相关的 JSON 信息流,并且遇到了一个我似乎无法在网上找到答案的问题。我收到的 JSON 看起来像下面的那个。
{
"Country": "Zambia",
"City": "Lusaka",
"Area": [
"Northmead"
],
"MalariaMedicine": [
{
"pharmacyName": "Northmead Health",
"brand": "Chloroquin",
"quantity": 65,
"batchNumber": "CHLORO 628 C",
"bestBeforeDate": "2025-05-23",
"expired": false,
"batchInformation": {
"number": "CHLORO 628 C",
"expiration": "2025-01-23"
}
},
{
"pharmacyName": "Prime Pharmacy",
"brand": "Quinin",
"quantity": 205,
"batchNumber": "QUIN 560 Q",
"bestBeforeDate": "2028-01-01",
"expired": false,
"batchInformation": {
"number": "QUIN 560 Q",
"expiration": "2028-01-01"
}
}
]
}
我已将 JSON 推送到一个名为疟疾的主题中,我使用下面的代码创建了一个 JSON 流。
CREATE STREAM MALARIASTREAM
(
COUNTRY STRING,
CITY STRING,
AREA ARRAY<STRING>,
MALARIAMEDICINE ARRAY<STRUCT<PHARMACYNAME STRING, BRAND STRING, QUANTITY INTEGER, BATCHNUMBER STRING, BESTBEFOREDATE STRING, EXPIRED BOOLEAN, BATCHINFORMATION STRUCT<NUMBER STRING, EXPIRATION STRING>>>
)
WITH (KAFKA_TOPIC='Malaria', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');
当我尝试使用下面的 SELECT 语句提取数据时出现问题
SELECT
COUNTRY,
CITY,
EXPLODE(AREA) AS AREA,
EXPLODE(MALARIAMEDICINE)->pharmacyName,
EXPLODE(MALARIAMEDICINE)->brand,
EXPLODE(MALARIAMEDICINE)->quantity,
EXPLODE(MALARIAMEDICINE)->batchNumber,
EXPLODE(MALARIAMEDICINE)->bestBeforeDate,
EXPLODE(MALARIAMEDICINE)->expired
FROM
MalariaStream EMIT CHANGES;
返回的结果集中,第二行AREA列的值为NULL。两家药店都在 Northmead 地区,所以我希望第二排也写 Northmead。
如何让第二行也显示 Northmead?
如果您知道您将始终拥有一个元素数组,则可以使用 ELT(1, Area)
到 select 该单例数组的第一个元素。
https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/scalar-functions/#elt
我正在研究与赞比亚的疟疾药物可用性相关的 JSON 信息流,并且遇到了一个我似乎无法在网上找到答案的问题。我收到的 JSON 看起来像下面的那个。
{
"Country": "Zambia",
"City": "Lusaka",
"Area": [
"Northmead"
],
"MalariaMedicine": [
{
"pharmacyName": "Northmead Health",
"brand": "Chloroquin",
"quantity": 65,
"batchNumber": "CHLORO 628 C",
"bestBeforeDate": "2025-05-23",
"expired": false,
"batchInformation": {
"number": "CHLORO 628 C",
"expiration": "2025-01-23"
}
},
{
"pharmacyName": "Prime Pharmacy",
"brand": "Quinin",
"quantity": 205,
"batchNumber": "QUIN 560 Q",
"bestBeforeDate": "2028-01-01",
"expired": false,
"batchInformation": {
"number": "QUIN 560 Q",
"expiration": "2028-01-01"
}
}
]
}
我已将 JSON 推送到一个名为疟疾的主题中,我使用下面的代码创建了一个 JSON 流。
CREATE STREAM MALARIASTREAM
(
COUNTRY STRING,
CITY STRING,
AREA ARRAY<STRING>,
MALARIAMEDICINE ARRAY<STRUCT<PHARMACYNAME STRING, BRAND STRING, QUANTITY INTEGER, BATCHNUMBER STRING, BESTBEFOREDATE STRING, EXPIRED BOOLEAN, BATCHINFORMATION STRUCT<NUMBER STRING, EXPIRATION STRING>>>
)
WITH (KAFKA_TOPIC='Malaria', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');
当我尝试使用下面的 SELECT 语句提取数据时出现问题
SELECT
COUNTRY,
CITY,
EXPLODE(AREA) AS AREA,
EXPLODE(MALARIAMEDICINE)->pharmacyName,
EXPLODE(MALARIAMEDICINE)->brand,
EXPLODE(MALARIAMEDICINE)->quantity,
EXPLODE(MALARIAMEDICINE)->batchNumber,
EXPLODE(MALARIAMEDICINE)->bestBeforeDate,
EXPLODE(MALARIAMEDICINE)->expired
FROM
MalariaStream EMIT CHANGES;
返回的结果集中,第二行AREA列的值为NULL。两家药店都在 Northmead 地区,所以我希望第二排也写 Northmead。
如何让第二行也显示 Northmead?
如果您知道您将始终拥有一个元素数组,则可以使用 ELT(1, Area)
到 select 该单例数组的第一个元素。
https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/scalar-functions/#elt