KSQLDB:Select 字段作为数组
KSQLDB: Select Fields As Array
我正在尝试通过 KsqlDb 将数据从一个主题(读取主题)转换为(写入主题)另一个主题。
这是为read-topic生成的数据
{
"orderNumber": "01235656",
"deliveryBarcode": "733998877",
"requestId": "1616516663000",
"status": "APPROVED_BY_SUPERVISOR"
}
我写了这些 ksqldb 查询:
-- The general stream to read the topic is like this:
CREATE STREAM GENERAL_STREAM (
deliveryBarcode VARCHAR,
orderNumber VARCHAR,
requestId VARCHAR,
status VARCHAR
) WITH (
kafka_topic = 'read-topic',
value_format = 'json'
);
-- This is the stream to redirect the filtered data throgh 'write-topic'
CREATE STREAM REDIRECTION_STREAM
WITH (
partitions = 6,
replicas = 3,
kafka_topic = 'write-topic',
value_format = 'json'
) AS
SELECT
AS_VALUE(requestId) `requestId`,
ARRAY<STRUCT<
deliveryBarcode
orderNumber
>> `packages`
FROM EARTH_DELIVERY_COURIER_PUDOPACKAGESTATUSUPDATED_0
WHERE (payload -> status = 'APPROVED_BY_SUPERVISOR')
EMIT CHANGES;
但是因为这部分我的查询不起作用:
ARRAY<STRUCT<
deliveryBarcode
orderNumber
>> `packages`
我对write-topic的预期数据是这样的
{
"requestId": "1616516663000"
"packages":[
{
"ordernumber":"01235656",
"barcodenumber":"733998877"
}
]
}
我应该如何修改这些查询才能按预期生成数组格式的 'packages' 字段?
您可以使用 array() ans as_map() 函数来生成您期望的输出。
这是我用来解决问题的 CSAS:
CREATE STREAM REDIRECTION_STREAM
WITH (kafka_topic='write_topic', value_format='json')
AS SELECT
AS_VALUE(requestId) requestId,
ARRAY[
AS_MAP(
ARRAY['deliverybarcode', 'ordernumber'],
ARRAY[deliverybarcode, ordernumber]
)
] packages
FROM GENERAL_STREAM;
这是上述主题的输出
print 'write_topic' from BEGINNING;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: JSON or KAFKA_STRING
rowtime: 2021/03/24 13:24:44.557 Z, key: <null>, value: {"REQUESTID":"1616516663000","PACKAGES":[{"ordernumber":"01235656","deliverybarcode":"733998877"}]}, partition: 0
^CTopic
我正在尝试通过 KsqlDb 将数据从一个主题(读取主题)转换为(写入主题)另一个主题。
这是为read-topic生成的数据
{
"orderNumber": "01235656",
"deliveryBarcode": "733998877",
"requestId": "1616516663000",
"status": "APPROVED_BY_SUPERVISOR"
}
我写了这些 ksqldb 查询:
-- The general stream to read the topic is like this:
CREATE STREAM GENERAL_STREAM (
deliveryBarcode VARCHAR,
orderNumber VARCHAR,
requestId VARCHAR,
status VARCHAR
) WITH (
kafka_topic = 'read-topic',
value_format = 'json'
);
-- This is the stream to redirect the filtered data throgh 'write-topic'
CREATE STREAM REDIRECTION_STREAM
WITH (
partitions = 6,
replicas = 3,
kafka_topic = 'write-topic',
value_format = 'json'
) AS
SELECT
AS_VALUE(requestId) `requestId`,
ARRAY<STRUCT<
deliveryBarcode
orderNumber
>> `packages`
FROM EARTH_DELIVERY_COURIER_PUDOPACKAGESTATUSUPDATED_0
WHERE (payload -> status = 'APPROVED_BY_SUPERVISOR')
EMIT CHANGES;
但是因为这部分我的查询不起作用:
ARRAY<STRUCT<
deliveryBarcode
orderNumber
>> `packages`
我对write-topic的预期数据是这样的
{
"requestId": "1616516663000"
"packages":[
{
"ordernumber":"01235656",
"barcodenumber":"733998877"
}
]
}
我应该如何修改这些查询才能按预期生成数组格式的 'packages' 字段?
您可以使用 array() ans as_map() 函数来生成您期望的输出。
这是我用来解决问题的 CSAS:
CREATE STREAM REDIRECTION_STREAM
WITH (kafka_topic='write_topic', value_format='json')
AS SELECT
AS_VALUE(requestId) requestId,
ARRAY[
AS_MAP(
ARRAY['deliverybarcode', 'ordernumber'],
ARRAY[deliverybarcode, ordernumber]
)
] packages
FROM GENERAL_STREAM;
这是上述主题的输出
print 'write_topic' from BEGINNING;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: JSON or KAFKA_STRING
rowtime: 2021/03/24 13:24:44.557 Z, key: <null>, value: {"REQUESTID":"1616516663000","PACKAGES":[{"ordernumber":"01235656","deliverybarcode":"733998877"}]}, partition: 0
^CTopic