Failing to generate a filter on debezium connector with error: "op is not a valid field name"

Failing to generate a filter on debezium connector with error: "op is not a valid field name"

我已经为 docker MySQL 容器创建了一个 debezium 连接器。 我尝试为消息设置过滤器:

    {
        "name": "my_connector",
        "config": {
            "name": "my_connector",
            "connector.class": "io.debezium.connector.mysql.MySqlConnector",
            
            ...
            
            "include.schema.changes": "true",
            "transforms": "filter, unwrap",
            "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
            "transforms.unwrap.drop.tombstones": "true",
            "transforms.filter.type": "io.debezium.transforms.Filter",
            "transforms.filter.language": "jsr223.groovy",
            "transforms.filter.condition": "value.source.table == 'table-name' && (value.op == 'd' || value.op == 'c' || (value.op == 'u' && value.after.status != value.before.status))"
        }
    }

http://localhost:8070/connectors/my_connector/status 我看到这个:

{ "connector": { "state": "RUNNING", "worker_id": "172.21.0.13:8083" }, "name": "my_connector", "tasks": [ { "id": 0, "state": "FAILED", "trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)\n\tat org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:320)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:245)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: io.debezium.DebeziumException: Error while evaluating expression 'value.source.table == 'subscription_contract' && (value.op == 'd' || value.op == 'c' || (value.op == 'u' && value.after.status != value.before.status))' for record 'SourceRecord{sourcePartition={server=subscription_contracts_db }, sourceOffset={file=binlog.000006, pos=19704, snapshot=true}} ConnectRecord{topic='subscription_contracts_db', kafkaPartition=0, key=Struct{databaseName=subscription-contracts}, keySchema=Schema{io.debezium.connector.mysql.SchemaChangeKey:STRUCT}, value=Struct{source=Struct{version=1.2.0.Final,connector=mysql,name=subscription_contracts_db,ts_ms=0,snapshot=true,db=subscription-contracts,table=subscription_contract,server_id=0,file=binlog.000006,pos=19704,row=0},databaseName=subscription-contracts,ddl=DROP TABLE IF EXISTS subscription-contracts.subscription_contract}, valueSchema=Schema{io.debezium.connector.mysql.SchemaChangeValue:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}'\n\tat io.debezium.transforms.scripting.Jsr223Engine.eval(Jsr223Engine.java:116)\n\tat io.debezium.transforms.Filter.doApply(Filter.java:33)\n\tat io.debezium.transforms.ScriptingTransformation.apply(ScriptingTransformation.java:189)\n\tat org.apache.kafka.connect.runtime.TransformationChain.lambda$apply[=14=](TransformationChain.java:50)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)\n\t... 11 more\nCaused by: javax.script.ScriptException: org.apache.kafka.connect.errors.DataException: op is not a valid field name\n\tat org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:320)\n\tat org.codehaus.groovy.jsr223.GroovyCompiledScript.eval(GroovyCompiledScript.java:71)\n\tat java.scripting/javax.script.CompiledScript.eval(CompiledScript.java:89)\n\tat io.debezium.transforms.scripting.Jsr223Engine.eval(Jsr223Engine.java:107)\n\t... 16 more\nCaused by: org.apache.kafka.connect.errors.DataException: op is not a valid field name\n\tat org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)\n\tat org.apache.kafka.connect.data.Struct.get(Struct.java:74)\n\tat jdk.internal.reflect.GeneratedMethodAccessor1.invoke(Unknown Source)\n\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.base/java.lang.reflect.Method.invoke(Method.java:566)\n\tat org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:107)\n\tat groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:323)\n\tat org.codehaus.groovy.runtime.metaclass.MethodMetaProperty$GetMethodMetaProperty.getProperty(MethodMetaProperty.java:62)\n\tat org.codehaus.groovy.runtime.callsite.GetEffectivePojoPropertySite.getProperty(GetEffectivePojoPropertySite.java:63)\n\tat org.codehaus.groovy.runtime.callsite.AbstractCallSite.callGetProperty(AbstractCallSite.java:329)\n\tat Script9.run(Script9.groovy:1)\n\tat org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:317)\n\t... 19 more\n", "worker_id": "172.21.0.13:8083" } ], "type": "source" }

正如OneCricketeer指出的,这里的基本问题是:

Caused by: javax.script.ScriptException: org.apache.kafka.connect.errors.DataException: op is not a valid field name\n\tat

但我不确定使用它有什么问题,因为它看起来像 应该是一个有效的字段 - here.

经过一番调查,我似乎找到了答案,希望对其他人有所帮助;

在我的连接器配置中,我有这样的配置:

"include.schema.changes": "true"

这导致我的连接器还包含有关数据库中架构更改的日志 table。 我有一个 docker 迁移器容器,它通过 运行 一些飞路迁移启动数据库容器,其中之一是我上面的异常中的 DROP TABLE。 由于模式更改消息没有理由包含 op 字段,所以它只是不包含(如示例 here 所示)。 当过滤器尝试获取字段时,它找不到它并抛出异常。 将配置更改为 false 解决了问题。