ksqlDB 中创建的流显示 NULL 值

The stream created in ksqlDB shows NULL value

我正在尝试在 ksqlDB 中创建一个流以从 kafka 主题获取数据并对其执行查询。

CREATE STREAM test_location (
  id VARCHAR,
  name VARCHAR,
  location VARCHAR
  )

 WITH (KAFKA_TOPIC='public.location',
       VALUE_FORMAT='JSON',
       PARTITIONS=10);

主题 public.location 中的数据采用 JSON 格式。

已更新主题消息。

print 'public.location' from beginning limit 1;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: JSON or KAFKA_STRING
rowtime: 2021/05/23 11:27:39.429 Z, key: <null>, value: {"sourceTable":{"id":"1","name":Sam,"location":Manchester,"ConnectorVersion":null,"connectorId":null,"ConnectorName":null,"DbName":null,"DbSchema":null,"TableName":null,"payload":null,"schema":null},"ConnectorVersion":null,"connectorId":null,"ConnectorName":null,"DbName":null,"DbSchema":null,"TableName":null,"payload":null,"schema":null}, partition: 3

创建流后,对创建的流执行 SELECT 我在输出中得到 NULL。虽然题目有数据。

select * from test_location
>EMIT CHANGES limit 5;
+-----------------------------------------------------------------+-----------------------------------------------------------------+-----------------------------------------------------------------+
|ID                                                               |NAME                                                            |LOCATION                                                          |
+-----------------------------------------------------------------+-----------------------------------------------------------------+-----------------------------------------------------------------+
|null                                                             |null                                                             |null                                                             |
|null                                                             |null                                                             |null                                                             |
|null                                                             |null                                                             |null                                                             |
|null                                                             |null                                                             |null                                                             |
|null                                                             |null                                                             |null                                                             |
Limit Reached
Query terminated

这是来自 docker 文件的详细信息

version: '2'

services:

  ksqldb-server:
    image: confluentinc/ksqldb-server:0.18.0
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - schema-registry
    ports:
      - "8088:8088"
    environment:
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_BOOTSTRAP_SERVERS: "broker:29092"
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
      # Configuration to embed Kafka Connect support.
      KSQL_CONNECT_GROUP_ID: "ksql-connect-01"
      KSQL_CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
      KSQL_CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      KSQL_CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      KSQL_CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_CONNECT_CONFIG_STORAGE_TOPIC: "_ksql-connect-01-configs"
      KSQL_CONNECT_OFFSET_STORAGE_TOPIC: "_ksql-connect-01-offsets"
      KSQL_CONNECT_STATUS_STORAGE_TOPIC: "_ksql-connect-01-statuses"
      KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      KSQL_CONNECT_PLUGIN_PATH: "/usr/share/kafka/plugins"

更新: 这是我在 Kafka

中看到的主题中的一条消息
{
   "sourceTable": {
      "id": "1",
      "name": Sam,
      "location": Manchester,
      "ConnectorVersion": null,
      "connectorId": null,
      "ConnectorName": null,
      "DbName": null,
      "DbSchema": null,
      "TableName": null,
      "payload": null,
      "schema": null
   },
   "ConnectorVersion": null,
   "connectorId": null,
   "ConnectorName": null,
   "DbName": null,
   "DbSchema": null,
   "TableName": null,
   "payload": null,
   "schema": null
}

我缺少哪个步骤或配置?

鉴于您的负载,您需要声明嵌套模式,因为 idnamelocation 不是 [=31= 中的“顶级”字段],但它们嵌套在 sourceTable.

CREATE STREAM est_location (
  sourceTable STRUCT<id VARCHAR, name VARCHAR, location VARCHAR>
)

定义架构时无法“展开”数据,但架构必须与主题中的内容相匹配。除了 sourceTable 之外,您还可以将 ConnectorVersion 等添加到架构中,因为它们也是您 JSON 中的“顶级”字段。底线是,ksqlDB 中的该列只能在顶级字段上声明。其他所有内容都是您可以使用 STRUCT 类型访问的嵌套数据。

当然以后查询est_location的时候可以通过sourceTable->id等方式引用个别字段

如果您想取消嵌套架构,也可以声明派生的 STREAM:

CREATE STREAM unnested_est_location AS
  SELECT sourceTable->id AS id,
         sourceTable->name AS name,
         sourceTable->location AS location
  FROM est_location;

当然,这样会把数据写到一个新的topic中。