不能 return 来自 KSQL UDF 的结构
Can not return a Struct from a KSQL UDF
我正在尝试制作一个接受 STRUCT 作为参数和 return 结构的 UDF 函数,在对输入进行一些修改(更新现有字段 + 添加新字段)后,如下面的代码所示:
@UdfDescription(name = "myCustomUdf")
public class MyCustomUdf {
@Udf(description = "do stuff")
public Struct MyCustomUdf(@UdfParameter(schema = "struct <NAME VARCHAR, EMAIL VARCHAR>", value = "user") final Struct struct) {
String processedEmail = struct.getString("EMAIL").toUpperCase();
struct.put("EMAIL", processedEmail);
return struct;
}
}
然而,在部署我的自定义 jar 时,我在 KSQL 日志中看到以下异常,在文档中找不到任何关于禁止此功能的内容,或者我在这里遗漏了什么:
制作 UDF return STRUCT 是否可行?
io.confluent.ksql.util.KsqlException: Could not load UDF method with signature: public org.apache.kafka.connect.data.Struct com.myudf.MyCustomUdf.MyCustomUdf(org.apache.kafka.connect.data.Struct)
at io.confluent.ksql.function.UdfLoader.getReturnType(UdfLoader.java:373)
at io.confluent.ksql.function.UdfLoader.addFunction(UdfLoader.java:280)
at io.confluent.ksql.function.UdfLoader.lambda$handleUdfAnnotation(UdfLoader.java:217)
at io.github.lukehutch.fastclasspathscanner.scanner.ScanSpec.lookForMatches(ScanSpec.java:1390)
at io.github.lukehutch.fastclasspathscanner.scanner.ScanSpec.callMatchProcessors(ScanSpec.java:696)
at io.github.lukehutch.fastclasspathscanner.FastClasspathScanner.scan(FastClasspathScanner.java:1606)
at io.github.lukehutch.fastclasspathscanner.FastClasspathScanner.scan(FastClasspathScanner.java:1678)
at io.github.lukehutch.fastclasspathscanner.FastClasspathScanner.scan(FastClasspathScanner.java:1704)
at io.confluent.ksql.function.UdfLoader.loadUdfs(UdfLoader.java:145)
at io.confluent.ksql.function.UdfLoader.lambda$load(UdfLoader.java:115)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.stream.ReferencePipeline.accept(ReferencePipeline.java:193)
at java.util.stream.ReferencePipeline.accept(ReferencePipeline.java:193)
at java.util.stream.ReferencePipeline.accept(ReferencePipeline.java:175)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at io.confluent.ksql.function.UdfLoader.load(UdfLoader.java:115)
at io.confluent.ksql.rest.server.KsqlRestApplication.buildApplication(KsqlRestApplication.java:473)
at io.confluent.ksql.rest.server.KsqlRestApplication.buildApplication(KsqlRestApplication.java:441)
at io.confluent.ksql.rest.server.KsqlServerMain.createExecutable(KsqlServerMain.java:94)
at io.confluent.ksql.rest.server.KsqlServerMain.main(KsqlServerMain.java:59)
Caused by: io.confluent.ksql.util.KsqlException: Type inference is not supported for: class org.apache.kafka.connect.data.Struct
at io.confluent.ksql.util.SchemaUtil.handleParametrizedType(SchemaUtil.java:378)
at io.confluent.ksql.util.SchemaUtil.lambda$getSchemaFromType(SchemaUtil.java:158)
at io.confluent.ksql.util.SchemaUtil.getSchemaFromType(SchemaUtil.java:158)
at io.confluent.ksql.util.SchemaUtil.getSchemaFromType(SchemaUtil.java:153)
at io.confluent.ksql.function.UdfLoader.getReturnType(UdfLoader.java:366)
... 26 more
KSQL 不仅需要输入参数的架构,还需要输出的架构。对于您的情况,请确保在 @Udf
注释中指定以下内容:
@Udf(description = "do stuff", schema="STRUCT<name VARCHAR, email VARCHAR>")
public Struct MyCustomUdf(@UdfParameter(schema = "struct <NAME VARCHAR, EMAIL VARCHAR>", value = "user") final Struct struct) {
...
}
看来我们的文档中确实遗漏了这一点!我添加了一个问题以确保它得到解决 (https://github.com/confluentinc/ksql/issues/3699)。
我正在尝试制作一个接受 STRUCT 作为参数和 return 结构的 UDF 函数,在对输入进行一些修改(更新现有字段 + 添加新字段)后,如下面的代码所示:
@UdfDescription(name = "myCustomUdf")
public class MyCustomUdf {
@Udf(description = "do stuff")
public Struct MyCustomUdf(@UdfParameter(schema = "struct <NAME VARCHAR, EMAIL VARCHAR>", value = "user") final Struct struct) {
String processedEmail = struct.getString("EMAIL").toUpperCase();
struct.put("EMAIL", processedEmail);
return struct;
}
}
然而,在部署我的自定义 jar 时,我在 KSQL 日志中看到以下异常,在文档中找不到任何关于禁止此功能的内容,或者我在这里遗漏了什么:
制作 UDF return STRUCT 是否可行?
io.confluent.ksql.util.KsqlException: Could not load UDF method with signature: public org.apache.kafka.connect.data.Struct com.myudf.MyCustomUdf.MyCustomUdf(org.apache.kafka.connect.data.Struct)
at io.confluent.ksql.function.UdfLoader.getReturnType(UdfLoader.java:373)
at io.confluent.ksql.function.UdfLoader.addFunction(UdfLoader.java:280)
at io.confluent.ksql.function.UdfLoader.lambda$handleUdfAnnotation(UdfLoader.java:217)
at io.github.lukehutch.fastclasspathscanner.scanner.ScanSpec.lookForMatches(ScanSpec.java:1390)
at io.github.lukehutch.fastclasspathscanner.scanner.ScanSpec.callMatchProcessors(ScanSpec.java:696)
at io.github.lukehutch.fastclasspathscanner.FastClasspathScanner.scan(FastClasspathScanner.java:1606)
at io.github.lukehutch.fastclasspathscanner.FastClasspathScanner.scan(FastClasspathScanner.java:1678)
at io.github.lukehutch.fastclasspathscanner.FastClasspathScanner.scan(FastClasspathScanner.java:1704)
at io.confluent.ksql.function.UdfLoader.loadUdfs(UdfLoader.java:145)
at io.confluent.ksql.function.UdfLoader.lambda$load(UdfLoader.java:115)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.stream.ReferencePipeline.accept(ReferencePipeline.java:193)
at java.util.stream.ReferencePipeline.accept(ReferencePipeline.java:193)
at java.util.stream.ReferencePipeline.accept(ReferencePipeline.java:175)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at io.confluent.ksql.function.UdfLoader.load(UdfLoader.java:115)
at io.confluent.ksql.rest.server.KsqlRestApplication.buildApplication(KsqlRestApplication.java:473)
at io.confluent.ksql.rest.server.KsqlRestApplication.buildApplication(KsqlRestApplication.java:441)
at io.confluent.ksql.rest.server.KsqlServerMain.createExecutable(KsqlServerMain.java:94)
at io.confluent.ksql.rest.server.KsqlServerMain.main(KsqlServerMain.java:59)
Caused by: io.confluent.ksql.util.KsqlException: Type inference is not supported for: class org.apache.kafka.connect.data.Struct
at io.confluent.ksql.util.SchemaUtil.handleParametrizedType(SchemaUtil.java:378)
at io.confluent.ksql.util.SchemaUtil.lambda$getSchemaFromType(SchemaUtil.java:158)
at io.confluent.ksql.util.SchemaUtil.getSchemaFromType(SchemaUtil.java:158)
at io.confluent.ksql.util.SchemaUtil.getSchemaFromType(SchemaUtil.java:153)
at io.confluent.ksql.function.UdfLoader.getReturnType(UdfLoader.java:366)
... 26 more
KSQL 不仅需要输入参数的架构,还需要输出的架构。对于您的情况,请确保在 @Udf
注释中指定以下内容:
@Udf(description = "do stuff", schema="STRUCT<name VARCHAR, email VARCHAR>")
public Struct MyCustomUdf(@UdfParameter(schema = "struct <NAME VARCHAR, EMAIL VARCHAR>", value = "user") final Struct struct) {
...
}
看来我们的文档中确实遗漏了这一点!我添加了一个问题以确保它得到解决 (https://github.com/confluentinc/ksql/issues/3699)。