使用 postgresql 字段类型文本过滤 kafka 消息时出错

getting error whille filtering kafka messages with postgresql field type text

错误 || WorkerSourceTask{id=ptl_connector-0} 任务抛出一个未捕获且不可恢复的异常 [org.apache.kafka.connect.runtime.WorkerTask] org.apache.kafka.connect.errors.ConnectException: 错误处理程序超出容差 在 org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178) 在 org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104) 在 org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50) 在 org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:320) 在 org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:245) 在 org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184) 在 org.apache.kafka.connect.runtime.WorkerTask.运行(WorkerTask.java:234) 在 java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) 在 java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) 在 java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 在 java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 在 java.base/java.lang.Thread.run(Thread.java:834) 原因:io.debezium.DebeziumException:为记录 'SourceRecord{sourcePartition={server=testdev_ptl005}, sourceOffset={last_snapshot_record=false, lsn=27649944, txId=707, ts_usec=1594357573069000, snapshot=true}} timestamp=null, headers=ConnectHeaders(headers=)}' 计算表达式 'value.after.brandid == BrandA' 时出错 在 io.debezium.transforms.scripting.Jsr223Engine.eval(Jsr223Engine.java:116) 在 io.debezium.transforms.Filter.doApply(Filter.java:33) 在 io.debezium.transforms.ScriptingTransformation.apply(ScriptingTransformation.java:189) 在 org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50) 在 org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) 在 org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) ... 11 更多 由以下原因引起:javax.script.ScriptException:groovy.lang.MissingPropertyException:没有这样的 属性:class 的 BrandA:Script1 在 org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:320) 在 org.codehaus.groovy.jsr223.GroovyCompiledScript.eval(GroovyCompiledScript.java:71) 在 java.scripting/javax.script.CompiledScript.eval(CompiledScript.java:89) 在 io.debezium.transforms.scripting.Jsr223Engine.eval(Jsr223Engine.java:107) ... 还有 16 个 由以下原因引起:groovy.lang.MissingPropertyException:没有这样的 属性:class 的 BrandA:Script1 在 org.codehaus.groovy.运行time.ScriptBytecodeAdapter.unwrap(ScriptBytecodeAdapter.java:65) 在 org.codehaus.groovy.运行time.callsite.PogoGetPropertySite.getProperty(PogoGetPropertySite.java:51) 在 org.codehaus.groovy.运行time.callsite.AbstractCallSite.callGroovyObjectGetProperty(AbstractCallSite.java:341) 在 Script1.run(Script1.groovy:1) 在 org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:317) ... 还有 19 个

您是否考虑过使用 kafka 连接器来做到这一点? 也许这会有所帮助: https://docs.confluent.io/current/connect/transforms/filter.html

请将 JAR 文件放入连接器(如 debezium-connector-*)目录

  • groovy-3.0.4.jar
  • groovy-jsr223-3.0.4.jar
  • groovy-json-3.0.4.jar