KSQLDB:Select 字段作为数组

KSQLDB: Select Fields As Array

我正在尝试通过 KsqlDb 将数据从一个主题(读取主题)转换为(写入主题)另一个主题。

这是为read-topic生成的数据

{
  "orderNumber": "01235656",
  "deliveryBarcode": "733998877",
  "requestId": "1616516663000",
  "status": "APPROVED_BY_SUPERVISOR"
}

我写了这些 ksqldb 查询:

-- The general stream to read the topic is like this:
CREATE STREAM GENERAL_STREAM (
    deliveryBarcode VARCHAR,
    orderNumber VARCHAR,
    requestId VARCHAR,
    status VARCHAR
) WITH (
    kafka_topic = 'read-topic',
    value_format = 'json'
);


-- This is the stream to redirect the filtered data throgh 'write-topic'
CREATE STREAM REDIRECTION_STREAM
WITH (
    partitions = 6,
    replicas = 3,
    kafka_topic = 'write-topic',
    value_format = 'json'
) AS
SELECT
       AS_VALUE(requestId) `requestId`,
       ARRAY<STRUCT<
        deliveryBarcode 
        orderNumber
       >> `packages`
FROM EARTH_DELIVERY_COURIER_PUDOPACKAGESTATUSUPDATED_0
WHERE (payload -> status = 'APPROVED_BY_SUPERVISOR')
EMIT CHANGES;

但是因为这部分我的查询不起作用:

ARRAY<STRUCT<
        deliveryBarcode 
        orderNumber
       >> `packages`

我对write-topic的预期数据是这样的

{
  "requestId": "1616516663000"
  "packages":[
    {
      "ordernumber":"01235656",
      "barcodenumber":"733998877"
    }
  ]
}

我应该如何修改这些查询才能按预期生成数组格式的 'packages' 字段?

您可以使用 array() ans as_map() 函数来生成您期望的输出。

这是我用来解决问题的 CSAS:

CREATE STREAM REDIRECTION_STREAM 
WITH (kafka_topic='write_topic', value_format='json') 
AS SELECT 
  AS_VALUE(requestId) requestId, 
  ARRAY[
    AS_MAP(
      ARRAY['deliverybarcode', 'ordernumber'], 
      ARRAY[deliverybarcode, ordernumber]
    )
  ] packages 
FROM GENERAL_STREAM;

这是上述主题的输出

print 'write_topic' from BEGINNING;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: JSON or KAFKA_STRING
rowtime: 2021/03/24 13:24:44.557 Z, key: <null>, value: {"REQUESTID":"1616516663000","PACKAGES":[{"ordernumber":"01235656","deliverybarcode":"733998877"}]}, partition: 0
^CTopic