Kafka connector config error: filter.condition: Invalid json path defined

Kafka connector config error: filter.condition: Invalid json path defined

我正在尝试使用 Confluent 的 Filter SMT with Debezium example unwrap-smt

我将以下配置添加到源连接器 (Debezium MySQL) 配置:

    "transforms": "route,csFilter",
    ...
    ...
    "transforms.csFilter.type": "io.confluent.connect.transforms.Filter$Value",
    "transforms.csFilter.filter.condition": "$.payload.after.source == 2",
    "transforms.csFilter.filter.type": "exclude",
    "transforms.csFilter.missing.or.null.behavior": "fail"

由于这个 Filter SMT 是由 Confluent 提供的,我下载了 jar file 并将(connect-transforms,connect-utils,json-path)jar 文件复制到 path-to-kafka/connect/debezium-connector-mysql 目录.

当我尝试注册 Debezium MySQL 源连接器时,

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 
localhost:8083/connectors/ -d @source_connector_config.json

我收到这个错误:

{"error_code":400,
"message":"Connector configuration is invalid and contains the following 1 error(s):\n
Invalid value $.payload.after.source == 2 for configuration filter.condition: Invalid json path defined. 
Please refer to https://github.com/json-path/JsonPath README for correct use of json path.\n
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"}

我用 this guide 中提供的示例检查了 JSON 路径表达式。好像还行。

你能给我指明正确的方向吗?我错过了什么? 谢谢。

请尝试使用这个条件:$.payload.after[?(@.source == 2)]