Failing to generate a filter on debezium connector with error: "op is not a valid field name"
Failing to generate a filter on debezium connector with error: "op is not a valid field name"
我已经为 docker MySQL 容器创建了一个 debezium 连接器。
我尝试为消息设置过滤器:
{
"name": "my_connector",
"config": {
"name": "my_connector",
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
...
"include.schema.changes": "true",
"transforms": "filter, unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "true",
"transforms.filter.type": "io.debezium.transforms.Filter",
"transforms.filter.language": "jsr223.groovy",
"transforms.filter.condition": "value.source.table == 'table-name' && (value.op == 'd' || value.op == 'c' || (value.op == 'u' && value.after.status != value.before.status))"
}
}
在 http://localhost:8070/connectors/my_connector/status
我看到这个:
{
"connector":
{
"state": "RUNNING",
"worker_id": "172.21.0.13:8083"
},
"name": "my_connector",
"tasks":
[
{
"id": 0,
"state": "FAILED",
"trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded
in error handler\n\tat
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)\n\tat
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)\n\tat
org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)\n\tat
org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:320)\n\tat
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:245)\n\tat
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)\n\tat
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat
java.base/java.lang.Thread.run(Thread.java:834)\nCaused by:
io.debezium.DebeziumException: Error while evaluating expression
'value.source.table == 'subscription_contract' && (value.op == 'd' ||
value.op == 'c' || (value.op == 'u' && value.after.status !=
value.before.status))' for record
'SourceRecord{sourcePartition={server=subscription_contracts_db },
sourceOffset={file=binlog.000006, pos=19704, snapshot=true}}
ConnectRecord{topic='subscription_contracts_db', kafkaPartition=0,
key=Struct{databaseName=subscription-contracts},
keySchema=Schema{io.debezium.connector.mysql.SchemaChangeKey:STRUCT},
value=Struct{source=Struct{version=1.2.0.Final,connector=mysql,name=subscription_contracts_db,ts_ms=0,snapshot=true,db=subscription-contracts,table=subscription_contract,server_id=0,file=binlog.000006,pos=19704,row=0},databaseName=subscription-contracts,ddl=DROP
TABLE IF EXISTS subscription-contracts
.subscription_contract
},
valueSchema=Schema{io.debezium.connector.mysql.SchemaChangeValue:STRUCT},
timestamp=null, headers=ConnectHeaders(headers=)}'\n\tat
io.debezium.transforms.scripting.Jsr223Engine.eval(Jsr223Engine.java:116)\n\tat
io.debezium.transforms.Filter.doApply(Filter.java:33)\n\tat
io.debezium.transforms.ScriptingTransformation.apply(ScriptingTransformation.java:189)\n\tat
org.apache.kafka.connect.runtime.TransformationChain.lambda$apply[=14=](TransformationChain.java:50)\n\tat
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)\n\tat
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)\n\t...
11 more\nCaused by: javax.script.ScriptException:
org.apache.kafka.connect.errors.DataException: op is not a valid field
name\n\tat
org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:320)\n\tat org.codehaus.groovy.jsr223.GroovyCompiledScript.eval(GroovyCompiledScript.java:71)\n\tat
java.scripting/javax.script.CompiledScript.eval(CompiledScript.java:89)\n\tat
io.debezium.transforms.scripting.Jsr223Engine.eval(Jsr223Engine.java:107)\n\t...
16 more\nCaused by: org.apache.kafka.connect.errors.DataException: op
is not a valid field name\n\tat
org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)\n\tat
org.apache.kafka.connect.data.Struct.get(Struct.java:74)\n\tat
jdk.internal.reflect.GeneratedMethodAccessor1.invoke(Unknown
Source)\n\tat
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat
java.base/java.lang.reflect.Method.invoke(Method.java:566)\n\tat
org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:107)\n\tat
groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:323)\n\tat
org.codehaus.groovy.runtime.metaclass.MethodMetaProperty$GetMethodMetaProperty.getProperty(MethodMetaProperty.java:62)\n\tat
org.codehaus.groovy.runtime.callsite.GetEffectivePojoPropertySite.getProperty(GetEffectivePojoPropertySite.java:63)\n\tat
org.codehaus.groovy.runtime.callsite.AbstractCallSite.callGetProperty(AbstractCallSite.java:329)\n\tat
Script9.run(Script9.groovy:1)\n\tat
org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:317)\n\t... 19 more\n",
"worker_id": "172.21.0.13:8083"
}
],
"type": "source" }
正如OneCricketeer指出的,这里的基本问题是:
Caused by: javax.script.ScriptException: org.apache.kafka.connect.errors.DataException: op is not a valid field name\n\tat
但我不确定使用它有什么问题,因为它看起来像
应该是一个有效的字段 - here.
经过一番调查,我似乎找到了答案,希望对其他人有所帮助;
在我的连接器配置中,我有这样的配置:
"include.schema.changes": "true"
这导致我的连接器还包含有关数据库中架构更改的日志 table。
我有一个 docker 迁移器容器,它通过 运行 一些飞路迁移启动数据库容器,其中之一是我上面的异常中的 DROP TABLE
。
由于模式更改消息没有理由包含 op
字段,所以它只是不包含(如示例 here 所示)。
当过滤器尝试获取字段时,它找不到它并抛出异常。
将配置更改为 false
解决了问题。
我已经为 docker MySQL 容器创建了一个 debezium 连接器。 我尝试为消息设置过滤器:
{
"name": "my_connector",
"config": {
"name": "my_connector",
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
...
"include.schema.changes": "true",
"transforms": "filter, unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "true",
"transforms.filter.type": "io.debezium.transforms.Filter",
"transforms.filter.language": "jsr223.groovy",
"transforms.filter.condition": "value.source.table == 'table-name' && (value.op == 'd' || value.op == 'c' || (value.op == 'u' && value.after.status != value.before.status))"
}
}
在 http://localhost:8070/connectors/my_connector/status
我看到这个:
{ "connector": { "state": "RUNNING", "worker_id": "172.21.0.13:8083" }, "name": "my_connector", "tasks": [ { "id": 0, "state": "FAILED", "trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)\n\tat org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:320)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:245)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: io.debezium.DebeziumException: Error while evaluating expression 'value.source.table == 'subscription_contract' && (value.op == 'd' || value.op == 'c' || (value.op == 'u' && value.after.status != value.before.status))' for record 'SourceRecord{sourcePartition={server=subscription_contracts_db }, sourceOffset={file=binlog.000006, pos=19704, snapshot=true}} ConnectRecord{topic='subscription_contracts_db', kafkaPartition=0, key=Struct{databaseName=subscription-contracts}, keySchema=Schema{io.debezium.connector.mysql.SchemaChangeKey:STRUCT}, value=Struct{source=Struct{version=1.2.0.Final,connector=mysql,name=subscription_contracts_db,ts_ms=0,snapshot=true,db=subscription-contracts,table=subscription_contract,server_id=0,file=binlog.000006,pos=19704,row=0},databaseName=subscription-contracts,ddl=DROP TABLE IF EXISTS
subscription-contracts
.subscription_contract
}, valueSchema=Schema{io.debezium.connector.mysql.SchemaChangeValue:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}'\n\tat io.debezium.transforms.scripting.Jsr223Engine.eval(Jsr223Engine.java:116)\n\tat io.debezium.transforms.Filter.doApply(Filter.java:33)\n\tat io.debezium.transforms.ScriptingTransformation.apply(ScriptingTransformation.java:189)\n\tat org.apache.kafka.connect.runtime.TransformationChain.lambda$apply[=14=](TransformationChain.java:50)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)\n\t... 11 more\nCaused by: javax.script.ScriptException: org.apache.kafka.connect.errors.DataException: op is not a valid field name\n\tat org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:320)\n\tat org.codehaus.groovy.jsr223.GroovyCompiledScript.eval(GroovyCompiledScript.java:71)\n\tat java.scripting/javax.script.CompiledScript.eval(CompiledScript.java:89)\n\tat io.debezium.transforms.scripting.Jsr223Engine.eval(Jsr223Engine.java:107)\n\t... 16 more\nCaused by: org.apache.kafka.connect.errors.DataException: op is not a valid field name\n\tat org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)\n\tat org.apache.kafka.connect.data.Struct.get(Struct.java:74)\n\tat jdk.internal.reflect.GeneratedMethodAccessor1.invoke(Unknown Source)\n\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.base/java.lang.reflect.Method.invoke(Method.java:566)\n\tat org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:107)\n\tat groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:323)\n\tat org.codehaus.groovy.runtime.metaclass.MethodMetaProperty$GetMethodMetaProperty.getProperty(MethodMetaProperty.java:62)\n\tat org.codehaus.groovy.runtime.callsite.GetEffectivePojoPropertySite.getProperty(GetEffectivePojoPropertySite.java:63)\n\tat org.codehaus.groovy.runtime.callsite.AbstractCallSite.callGetProperty(AbstractCallSite.java:329)\n\tat Script9.run(Script9.groovy:1)\n\tat org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:317)\n\t... 19 more\n", "worker_id": "172.21.0.13:8083" } ], "type": "source" }
正如OneCricketeer指出的,这里的基本问题是:
Caused by: javax.script.ScriptException: org.apache.kafka.connect.errors.DataException: op is not a valid field name\n\tat
但我不确定使用它有什么问题,因为它看起来像 应该是一个有效的字段 - here.
经过一番调查,我似乎找到了答案,希望对其他人有所帮助;
在我的连接器配置中,我有这样的配置:
"include.schema.changes": "true"
这导致我的连接器还包含有关数据库中架构更改的日志 table。
我有一个 docker 迁移器容器,它通过 运行 一些飞路迁移启动数据库容器,其中之一是我上面的异常中的 DROP TABLE
。
由于模式更改消息没有理由包含 op
字段,所以它只是不包含(如示例 here 所示)。
当过滤器尝试获取字段时,它找不到它并抛出异常。
将配置更改为 false
解决了问题。