ksqlDB 中创建的流显示 NULL 值
The stream created in ksqlDB shows NULL value
我正在尝试在 ksqlDB 中创建一个流以从 kafka 主题获取数据并对其执行查询。
CREATE STREAM test_location (
id VARCHAR,
name VARCHAR,
location VARCHAR
)
WITH (KAFKA_TOPIC='public.location',
VALUE_FORMAT='JSON',
PARTITIONS=10);
主题 public.location 中的数据采用 JSON 格式。
已更新主题消息。
print 'public.location' from beginning limit 1;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: JSON or KAFKA_STRING
rowtime: 2021/05/23 11:27:39.429 Z, key: <null>, value: {"sourceTable":{"id":"1","name":Sam,"location":Manchester,"ConnectorVersion":null,"connectorId":null,"ConnectorName":null,"DbName":null,"DbSchema":null,"TableName":null,"payload":null,"schema":null},"ConnectorVersion":null,"connectorId":null,"ConnectorName":null,"DbName":null,"DbSchema":null,"TableName":null,"payload":null,"schema":null}, partition: 3
创建流后,对创建的流执行 SELECT 我在输出中得到 NULL。虽然题目有数据。
select * from test_location
>EMIT CHANGES limit 5;
+-----------------------------------------------------------------+-----------------------------------------------------------------+-----------------------------------------------------------------+
|ID |NAME |LOCATION |
+-----------------------------------------------------------------+-----------------------------------------------------------------+-----------------------------------------------------------------+
|null |null |null |
|null |null |null |
|null |null |null |
|null |null |null |
|null |null |null |
Limit Reached
Query terminated
这是来自 docker 文件的详细信息
version: '2'
services:
ksqldb-server:
image: confluentinc/ksqldb-server:0.18.0
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- schema-registry
ports:
- "8088:8088"
environment:
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
# Configuration to embed Kafka Connect support.
KSQL_CONNECT_GROUP_ID: "ksql-connect-01"
KSQL_CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
KSQL_CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
KSQL_CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_CONNECT_CONFIG_STORAGE_TOPIC: "_ksql-connect-01-configs"
KSQL_CONNECT_OFFSET_STORAGE_TOPIC: "_ksql-connect-01-offsets"
KSQL_CONNECT_STATUS_STORAGE_TOPIC: "_ksql-connect-01-statuses"
KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
KSQL_CONNECT_PLUGIN_PATH: "/usr/share/kafka/plugins"
更新:
这是我在 Kafka
中看到的主题中的一条消息
{
"sourceTable": {
"id": "1",
"name": Sam,
"location": Manchester,
"ConnectorVersion": null,
"connectorId": null,
"ConnectorName": null,
"DbName": null,
"DbSchema": null,
"TableName": null,
"payload": null,
"schema": null
},
"ConnectorVersion": null,
"connectorId": null,
"ConnectorName": null,
"DbName": null,
"DbSchema": null,
"TableName": null,
"payload": null,
"schema": null
}
我缺少哪个步骤或配置?
鉴于您的负载,您需要声明嵌套模式,因为 id
、name
和 location
不是 [=31= 中的“顶级”字段],但它们嵌套在 sourceTable
.
中
CREATE STREAM est_location (
sourceTable STRUCT<id VARCHAR, name VARCHAR, location VARCHAR>
)
定义架构时无法“展开”数据,但架构必须与主题中的内容相匹配。除了 sourceTable
之外,您还可以将 ConnectorVersion
等添加到架构中,因为它们也是您 JSON 中的“顶级”字段。底线是,ksqlDB 中的该列只能在顶级字段上声明。其他所有内容都是您可以使用 STRUCT
类型访问的嵌套数据。
当然以后查询est_location
的时候可以通过sourceTable->id
等方式引用个别字段
如果您想取消嵌套架构,也可以声明派生的 STREAM:
CREATE STREAM unnested_est_location AS
SELECT sourceTable->id AS id,
sourceTable->name AS name,
sourceTable->location AS location
FROM est_location;
当然,这样会把数据写到一个新的topic中。
我正在尝试在 ksqlDB 中创建一个流以从 kafka 主题获取数据并对其执行查询。
CREATE STREAM test_location (
id VARCHAR,
name VARCHAR,
location VARCHAR
)
WITH (KAFKA_TOPIC='public.location',
VALUE_FORMAT='JSON',
PARTITIONS=10);
主题 public.location 中的数据采用 JSON 格式。
已更新主题消息。
print 'public.location' from beginning limit 1;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: JSON or KAFKA_STRING
rowtime: 2021/05/23 11:27:39.429 Z, key: <null>, value: {"sourceTable":{"id":"1","name":Sam,"location":Manchester,"ConnectorVersion":null,"connectorId":null,"ConnectorName":null,"DbName":null,"DbSchema":null,"TableName":null,"payload":null,"schema":null},"ConnectorVersion":null,"connectorId":null,"ConnectorName":null,"DbName":null,"DbSchema":null,"TableName":null,"payload":null,"schema":null}, partition: 3
创建流后,对创建的流执行 SELECT 我在输出中得到 NULL。虽然题目有数据。
select * from test_location
>EMIT CHANGES limit 5;
+-----------------------------------------------------------------+-----------------------------------------------------------------+-----------------------------------------------------------------+
|ID |NAME |LOCATION |
+-----------------------------------------------------------------+-----------------------------------------------------------------+-----------------------------------------------------------------+
|null |null |null |
|null |null |null |
|null |null |null |
|null |null |null |
|null |null |null |
Limit Reached
Query terminated
这是来自 docker 文件的详细信息
version: '2'
services:
ksqldb-server:
image: confluentinc/ksqldb-server:0.18.0
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- schema-registry
ports:
- "8088:8088"
environment:
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
# Configuration to embed Kafka Connect support.
KSQL_CONNECT_GROUP_ID: "ksql-connect-01"
KSQL_CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
KSQL_CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
KSQL_CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_CONNECT_CONFIG_STORAGE_TOPIC: "_ksql-connect-01-configs"
KSQL_CONNECT_OFFSET_STORAGE_TOPIC: "_ksql-connect-01-offsets"
KSQL_CONNECT_STATUS_STORAGE_TOPIC: "_ksql-connect-01-statuses"
KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
KSQL_CONNECT_PLUGIN_PATH: "/usr/share/kafka/plugins"
更新: 这是我在 Kafka
中看到的主题中的一条消息{
"sourceTable": {
"id": "1",
"name": Sam,
"location": Manchester,
"ConnectorVersion": null,
"connectorId": null,
"ConnectorName": null,
"DbName": null,
"DbSchema": null,
"TableName": null,
"payload": null,
"schema": null
},
"ConnectorVersion": null,
"connectorId": null,
"ConnectorName": null,
"DbName": null,
"DbSchema": null,
"TableName": null,
"payload": null,
"schema": null
}
我缺少哪个步骤或配置?
鉴于您的负载,您需要声明嵌套模式,因为 id
、name
和 location
不是 [=31= 中的“顶级”字段],但它们嵌套在 sourceTable
.
CREATE STREAM est_location (
sourceTable STRUCT<id VARCHAR, name VARCHAR, location VARCHAR>
)
定义架构时无法“展开”数据,但架构必须与主题中的内容相匹配。除了 sourceTable
之外,您还可以将 ConnectorVersion
等添加到架构中,因为它们也是您 JSON 中的“顶级”字段。底线是,ksqlDB 中的该列只能在顶级字段上声明。其他所有内容都是您可以使用 STRUCT
类型访问的嵌套数据。
当然以后查询est_location
的时候可以通过sourceTable->id
等方式引用个别字段
如果您想取消嵌套架构,也可以声明派生的 STREAM:
CREATE STREAM unnested_est_location AS
SELECT sourceTable->id AS id,
sourceTable->name AS name,
sourceTable->location AS location
FROM est_location;
当然,这样会把数据写到一个新的topic中。