KSQL UDF:DataException:结构模式不匹配
KSQL UDF: DataException: Struct schemas do not match
我正在尝试将流创建为 select (CSAS),流已成功创建,但是当我尝试推送消息时出现以下异常。
Caused by: org.apache.kafka.connect.errors.DataException: Struct schemas do not match.
at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:247)
at org.apache.kafka.connect.data.Struct.put(Struct.java:216)
at io.confluent.ksql.serde.GenericRowSerDe$GenericRowSerializer.serialize(GenericRowSerDe.java:116)
at io.confluent.ksql.serde.GenericRowSerDe$GenericRowSerializer.serialize(GenericRowSerDe.java:93)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:102)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
以下是来自 ksql-cli 的主流、持久流和 udf 函数的详细信息,不确定为什么架构不兼容,正如您在 processed
流下方看到的,有一个名为的字段article
架构与 UDF 函数的返回值完全相同,我在这里遗漏了什么吗?
ksql> create stream main_stream ( article struct< _id VARCHAR, title VARCHAR, text VARCHAR, action VARCHAR, url VARCHAR, feed_id VARCHAR, mode VARCHAR, score INTEGER, published_at VARCHAR, retrieved_at VARCHAR> ) with (KAFKA_TOPIC='articles', value_format='JSON');
ksql> create stream processed as select test(article) article from main_stream;
ksql> describe processed;
Name : processed
Field | Type
-------------------------------------------------------------------------------------------------------------------------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
ARTICLE | STRUCT<_ID VARCHAR(STRING), RAW_TITLE VARCHAR(STRING), RAW_TEXT VARCHAR(STRING), PROCESSED_TITLE VARCHAR(STRING), PROCESSED_TEXT VARCHAR(STRING)>
-------------------------------------------------------------------------------------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql> show queries;
Query ID | Kafka Topic | Query String
--------------------------------------------------------------------------------------------------------------------------------------------------------------
CSAS_processed_20 | processed | CREATE STREAM processed WITH (REPLICAS = 1, PARTITIONS = 1, KAFKA_TOPIC = 'processed') AS SELECT TEST(MAIN_STREAM.ARTICLE) "ARTICLE"
FROM MAIN_STREAM MAIN_STREAM;
--------------------------------------------------------------------------------------------------------------------------------------------------------------
ksql> describe function test;
Name : TEST
Overview : test udf
Type : scalar
Jar : /Users/ktawfik/libs/custom-udf.jar
Variations :
Variation : TEST(article STRUCT<_ID VARCHAR, TITLE VARCHAR, TEXT VARCHAR, ACTION VARCHAR, URL VARCHAR, FEED_ID VARCHAR, MODE VARCHAR, SCORE INT, PUBLISHED_AT VARCHAR, RETRIEVED_AT VARCHAR>)
Returns : STRUCT<_ID VARCHAR, RAW_TITLE VARCHAR, RAW_TEXT VARCHAR, PROCESSED_TITLE VARCHAR, PROCESSED_TEXT VARCHAR>
Description : test
article : A complete article object
下面还有我用的UDF代码
@Udf(description = "test",
schema = "struct< _id VARCHAR, raw_title VARCHAR, raw_text VARCHAR, processed_title VARCHAR, processed_text VARCHAR>")
public Struct processDocument(
@UdfParameter(
schema = "struct< _id VARCHAR, title VARCHAR, text VARCHAR, action VARCHAR, url VARCHAR, feed_id VARCHAR, mode VARCHAR, score INTEGER, published_at VARCHAR, retrieved_at VARCHAR>",
value = "article",
description = "A complete article object") Struct struct) {
Schema ARTICLE_SCHEMA = SchemaBuilder.struct()
.field("_id", Schema.STRING_SCHEMA)
.field("raw_title", Schema.STRING_SCHEMA)
.field("raw_text", Schema.STRING_SCHEMA)
.field("processed_title", Schema.STRING_SCHEMA)
.field("processed_text", Schema.STRING_SCHEMA)
.build();
Struct proStruct = new Struct(ARTICLE_SCHEMA);
proStruct.put("_id", "1234");
proStruct.put("raw_title", "RAW_TITLE___1234");
proStruct.put("raw_text", "RAW_TEXT___1234");
proStruct.put("processed_title", "TITLE____1234");
proStruct.put("processed_text", "TEXT____1234");
System.out.println(proStruct);
// Struct{_id=1234,raw_title=RAW_TITLE___1234,raw_text=RAW_TEXT___1234,processed_title=TITLE____1234,processed_text=TEXT____1234}
return proStruct;
}
我能够找出问题并解决它,基本上是 KSQL 引擎将模式字段转换为大写的事实,因此当我发送小写字段时它无法匹配它,这文档中不清楚。
解决方法是我必须拥有:
- 以编程方式定义的 SCHEMA 中的所有字段均采用大写形式,以及
@UDF
注释中的架构字段。
- 以编程方式定义的架构中的所有字段都应与
@UDF
注释中架构字段中的所有字段(名称和类型)完全匹配。
代码最终看起来像:
@Udf(description = "test",
schema = "struct< _ID VARCHAR, RAW_TITLE VARCHAR, RAW_TEXT VARCHAR, PROCESSED_TITLE VARCHAR, PROCESSED_TEXT VARCHAR>")
public Struct processDocument(
@UdfParameter(
schema = "struct< _id VARCHAR, title VARCHAR, text VARCHAR, action VARCHAR, url VARCHAR, feed_id VARCHAR, mode VARCHAR, score INTEGER, published_at VARCHAR, retrieved_at VARCHAR>",
value = "article",
description = "A complete article object") Struct struct) {
Schema ARTICLE_SCHEMA = SchemaBuilder.struct()
.field("_ID", Schema.STRING_SCHEMA)
.field("RAW_TITLE", Schema.STRING_SCHEMA)
.field("RAW_TEXT", Schema.STRING_SCHEMA)
.field("PROCESSED_TITLE", Schema.STRING_SCHEMA)
.field("PROCESSED_TEXT", Schema.STRING_SCHEMA)
.build();
Struct proStruct = new Struct(ARTICLE_SCHEMA);
proStruct.put("_ID", "1234");
proStruct.put("RAW_TITLE", "RAW_TITLE___1234");
proStruct.put("RAW_TEXT", "RAW_TEXT___1234");
proStruct.put("PROCESSED_TITLE", "TITLE____1234");
proStruct.put("PROCESSED_TEXT", "TEXT____1234");
System.out.println(proStruct);
return proStruct;
}
我试图以同样的方式解决问题,但我遇到了以下情况:
UDF:
@UdfDescription(name = "ValueUnpacker", description = "..")
public class ValueUnpacker {
private Schema valueSchema = SchemaBuilder.struct()
.field("LABEL", Schema.INT32_SCHEMA)
.build();
@Udf(description = "Test a string", schema = "struct<LABEL INT>")
public Struct unpackValue(@UdfParameter(value = "thingType", description = "a thing type") String thingType) {
Struct ret = new Struct(valueSchema);
int i = 5;
ret.put("LABEL", i);
System.out.println("Ret: " + ret);
return ret;
}
}
运行 ksqldb
并输入:
ksql> SELECT valueunpacker('test') FROM SOME_STREAM EMIT CHANGES;
|{LABEL=5}
|{LABEL=5}
而且效果很好。但是将流创建为
CREATE STREAM CONSTANT_STREAM AS SELECT valueunpacker('test') FROM SOME_STREAM;
失败,没有流输出。 ksqldb
的日志导致同样的问题:“Schemas do not match”。
我正在尝试将流创建为 select (CSAS),流已成功创建,但是当我尝试推送消息时出现以下异常。
Caused by: org.apache.kafka.connect.errors.DataException: Struct schemas do not match.
at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:247)
at org.apache.kafka.connect.data.Struct.put(Struct.java:216)
at io.confluent.ksql.serde.GenericRowSerDe$GenericRowSerializer.serialize(GenericRowSerDe.java:116)
at io.confluent.ksql.serde.GenericRowSerDe$GenericRowSerializer.serialize(GenericRowSerDe.java:93)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:102)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
以下是来自 ksql-cli 的主流、持久流和 udf 函数的详细信息,不确定为什么架构不兼容,正如您在 processed
流下方看到的,有一个名为的字段article
架构与 UDF 函数的返回值完全相同,我在这里遗漏了什么吗?
ksql> create stream main_stream ( article struct< _id VARCHAR, title VARCHAR, text VARCHAR, action VARCHAR, url VARCHAR, feed_id VARCHAR, mode VARCHAR, score INTEGER, published_at VARCHAR, retrieved_at VARCHAR> ) with (KAFKA_TOPIC='articles', value_format='JSON');
ksql> create stream processed as select test(article) article from main_stream;
ksql> describe processed;
Name : processed
Field | Type
-------------------------------------------------------------------------------------------------------------------------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
ARTICLE | STRUCT<_ID VARCHAR(STRING), RAW_TITLE VARCHAR(STRING), RAW_TEXT VARCHAR(STRING), PROCESSED_TITLE VARCHAR(STRING), PROCESSED_TEXT VARCHAR(STRING)>
-------------------------------------------------------------------------------------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql> show queries;
Query ID | Kafka Topic | Query String
--------------------------------------------------------------------------------------------------------------------------------------------------------------
CSAS_processed_20 | processed | CREATE STREAM processed WITH (REPLICAS = 1, PARTITIONS = 1, KAFKA_TOPIC = 'processed') AS SELECT TEST(MAIN_STREAM.ARTICLE) "ARTICLE"
FROM MAIN_STREAM MAIN_STREAM;
--------------------------------------------------------------------------------------------------------------------------------------------------------------
ksql> describe function test;
Name : TEST
Overview : test udf
Type : scalar
Jar : /Users/ktawfik/libs/custom-udf.jar
Variations :
Variation : TEST(article STRUCT<_ID VARCHAR, TITLE VARCHAR, TEXT VARCHAR, ACTION VARCHAR, URL VARCHAR, FEED_ID VARCHAR, MODE VARCHAR, SCORE INT, PUBLISHED_AT VARCHAR, RETRIEVED_AT VARCHAR>)
Returns : STRUCT<_ID VARCHAR, RAW_TITLE VARCHAR, RAW_TEXT VARCHAR, PROCESSED_TITLE VARCHAR, PROCESSED_TEXT VARCHAR>
Description : test
article : A complete article object
下面还有我用的UDF代码
@Udf(description = "test",
schema = "struct< _id VARCHAR, raw_title VARCHAR, raw_text VARCHAR, processed_title VARCHAR, processed_text VARCHAR>")
public Struct processDocument(
@UdfParameter(
schema = "struct< _id VARCHAR, title VARCHAR, text VARCHAR, action VARCHAR, url VARCHAR, feed_id VARCHAR, mode VARCHAR, score INTEGER, published_at VARCHAR, retrieved_at VARCHAR>",
value = "article",
description = "A complete article object") Struct struct) {
Schema ARTICLE_SCHEMA = SchemaBuilder.struct()
.field("_id", Schema.STRING_SCHEMA)
.field("raw_title", Schema.STRING_SCHEMA)
.field("raw_text", Schema.STRING_SCHEMA)
.field("processed_title", Schema.STRING_SCHEMA)
.field("processed_text", Schema.STRING_SCHEMA)
.build();
Struct proStruct = new Struct(ARTICLE_SCHEMA);
proStruct.put("_id", "1234");
proStruct.put("raw_title", "RAW_TITLE___1234");
proStruct.put("raw_text", "RAW_TEXT___1234");
proStruct.put("processed_title", "TITLE____1234");
proStruct.put("processed_text", "TEXT____1234");
System.out.println(proStruct);
// Struct{_id=1234,raw_title=RAW_TITLE___1234,raw_text=RAW_TEXT___1234,processed_title=TITLE____1234,processed_text=TEXT____1234}
return proStruct;
}
我能够找出问题并解决它,基本上是 KSQL 引擎将模式字段转换为大写的事实,因此当我发送小写字段时它无法匹配它,这文档中不清楚。
解决方法是我必须拥有:
- 以编程方式定义的 SCHEMA 中的所有字段均采用大写形式,以及
@UDF
注释中的架构字段。 - 以编程方式定义的架构中的所有字段都应与
@UDF
注释中架构字段中的所有字段(名称和类型)完全匹配。
代码最终看起来像:
@Udf(description = "test",
schema = "struct< _ID VARCHAR, RAW_TITLE VARCHAR, RAW_TEXT VARCHAR, PROCESSED_TITLE VARCHAR, PROCESSED_TEXT VARCHAR>")
public Struct processDocument(
@UdfParameter(
schema = "struct< _id VARCHAR, title VARCHAR, text VARCHAR, action VARCHAR, url VARCHAR, feed_id VARCHAR, mode VARCHAR, score INTEGER, published_at VARCHAR, retrieved_at VARCHAR>",
value = "article",
description = "A complete article object") Struct struct) {
Schema ARTICLE_SCHEMA = SchemaBuilder.struct()
.field("_ID", Schema.STRING_SCHEMA)
.field("RAW_TITLE", Schema.STRING_SCHEMA)
.field("RAW_TEXT", Schema.STRING_SCHEMA)
.field("PROCESSED_TITLE", Schema.STRING_SCHEMA)
.field("PROCESSED_TEXT", Schema.STRING_SCHEMA)
.build();
Struct proStruct = new Struct(ARTICLE_SCHEMA);
proStruct.put("_ID", "1234");
proStruct.put("RAW_TITLE", "RAW_TITLE___1234");
proStruct.put("RAW_TEXT", "RAW_TEXT___1234");
proStruct.put("PROCESSED_TITLE", "TITLE____1234");
proStruct.put("PROCESSED_TEXT", "TEXT____1234");
System.out.println(proStruct);
return proStruct;
}
我试图以同样的方式解决问题,但我遇到了以下情况:
UDF:
@UdfDescription(name = "ValueUnpacker", description = "..")
public class ValueUnpacker {
private Schema valueSchema = SchemaBuilder.struct()
.field("LABEL", Schema.INT32_SCHEMA)
.build();
@Udf(description = "Test a string", schema = "struct<LABEL INT>")
public Struct unpackValue(@UdfParameter(value = "thingType", description = "a thing type") String thingType) {
Struct ret = new Struct(valueSchema);
int i = 5;
ret.put("LABEL", i);
System.out.println("Ret: " + ret);
return ret;
}
}
运行 ksqldb
并输入:
ksql> SELECT valueunpacker('test') FROM SOME_STREAM EMIT CHANGES;
|{LABEL=5}
|{LABEL=5}
而且效果很好。但是将流创建为
CREATE STREAM CONSTANT_STREAM AS SELECT valueunpacker('test') FROM SOME_STREAM;
失败,没有流输出。 ksqldb
的日志导致同样的问题:“Schemas do not match”。