KsqlStatementException: 语句没有定义模式,提供的格式不支持模式推断

KsqlStatementException: statement does not define the schema and the supplied format does not support schema inference

我在 kubernetes 上使用 confluent helm charts 使用 ksql-server。

https://github.com/confluentinc/cp-helm-charts/tree/master/charts/cp-ksql-server

我针对自己的个人用例修改了 queries.sql 文件。 https://github.com/confluentinc/cp-helm-charts/blob/master/charts/cp-ksql-server/queries.sql

这是我的查询:

CREATE STREAM ksql_test WITH (kafka_topic='orders-topic', value_format='DELIMITED', partitions='1', replicas='3');

部署此 pod 后,出现此错误:

ERROR Failed to start KSQL Server with query file: /etc/ksql/queries/queries.sql (io.confluent.ksql.rest.server.StandaloneExecutor:124)
io.confluent.ksql.util.KsqlStatementException: statement does not define the schema and the supplied format does not support schema inference
Statement: CREATE STREAM ksql_test WITH (kafka_topic='orders-topic', value_format='DELIMITED', partitions='1', replicas='3');

无论我是否将格式更改为 JSON,错误仍然存​​在,而且我没有该主题的架构。

问题是:

statement does not define the schema

KSQL stream 是一个 Kafka 主题 plus 一个模式。没有模式,没有流。

如果您的数据是分隔的,可能看起来像这样:

1,FOO,0.1,400

它有一个架构,可能类似于:

CREATE STREAM example (COL1 INT, LABEL VARCHAR, WIBBLE DOUBLE, TARGET BIGINT)
WITH (KAFKA_TOPIC='example_topic',  VALUE_FORMAT='DELIMITED);

tl;dr 你不能在没有模式的情况下创建流。 如果你正在使用 Avro,你已经有一个模式(在模式注册表中)因此不要必须声明它。如果您使用 JSON 或 Delimited,则必须显式声明架构。