无法通过 JDBC 源连接器触发自定义生产者拦截器
Unable to trigger Custom Producer Interceptor via JDBC Source Connector
我创建了一个自定义的生产者拦截器 (AuditProducerInterceptor),它接受一些自定义配置(application_id、类型等)。我从 AuditProducerInterceptor 项目生成了一个 jar,并将该 jar 放在 /usr/share/java/monitoring-interceptors 的 kafka-connect 中。当我尝试使用以下配置 post JDBC-Source 连接器时,我的审计拦截器没有被触发。
{
"name": "jdbc-source-xx-xxxx-xxx-xxx",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:sqlserver://{{ip}}:1433;databaseName=XX;useNTLMv2=true",
"connection.user": "SA",
"connection.password": "Admin1234",
"producer.interceptor.classes": "com.optum.payer.common.kafka.audit.interceptor.AuditProducerInterceptor",
"topic.prefix": "MyTestTopic",
"query": "SELECT ID, chart_id, request_id, UpdatedDate FROM xxx.xxx WITH (NOLOCK)",
"mode": "timestamp",
"timestamp.column.name": "UpdatedDate",
"producer.audit.application.id": "HelloApplication",
"producer.audit.type": "test type",
"poll.interval.ms": "10",
"tasks.max": "1",
"batch.max.rows": "100",
"validate.non.null": "false",
"numeric.mapping":"best_fit",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://{{ip}}:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://{{ip}}:8081"
}}
正如您在配置中看到的,我在连接器配置中添加了以下道具以触发自定义拦截器。但是我在 Kafka-Connect 中没有看到任何与 AuditProducerInterceptor 相关的日志。
"producer.interceptor.classes": "com.optum.payer.common.kafka.audit.interceptor.AuditProducerInterceptor"
"producer.audit.application.id": "HelloApplication",
"producer.audit.type": "test type"
我尝试在 kafka-connect 配置中添加这三个配置,我能够触发拦截器。但我想通过 JDBC 源连接器触发拦截器,以便我可以通过连接器传递自定义道具(application_id、类型等)。
请帮我解决这个问题
如果您在 Connect worker 中允许客户端覆盖(默认启用),您将需要使用 producer.override
前缀
来自文档
Starting with 2.3.0, client configuration overrides can be configured individually per connector by using the prefixes producer.override.
and consumer.override.
for Kafka sources or Kafka sinks respectively.
我创建了一个自定义的生产者拦截器 (AuditProducerInterceptor),它接受一些自定义配置(application_id、类型等)。我从 AuditProducerInterceptor 项目生成了一个 jar,并将该 jar 放在 /usr/share/java/monitoring-interceptors 的 kafka-connect 中。当我尝试使用以下配置 post JDBC-Source 连接器时,我的审计拦截器没有被触发。
{
"name": "jdbc-source-xx-xxxx-xxx-xxx",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:sqlserver://{{ip}}:1433;databaseName=XX;useNTLMv2=true",
"connection.user": "SA",
"connection.password": "Admin1234",
"producer.interceptor.classes": "com.optum.payer.common.kafka.audit.interceptor.AuditProducerInterceptor",
"topic.prefix": "MyTestTopic",
"query": "SELECT ID, chart_id, request_id, UpdatedDate FROM xxx.xxx WITH (NOLOCK)",
"mode": "timestamp",
"timestamp.column.name": "UpdatedDate",
"producer.audit.application.id": "HelloApplication",
"producer.audit.type": "test type",
"poll.interval.ms": "10",
"tasks.max": "1",
"batch.max.rows": "100",
"validate.non.null": "false",
"numeric.mapping":"best_fit",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://{{ip}}:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://{{ip}}:8081"
}}
正如您在配置中看到的,我在连接器配置中添加了以下道具以触发自定义拦截器。但是我在 Kafka-Connect 中没有看到任何与 AuditProducerInterceptor 相关的日志。
"producer.interceptor.classes": "com.optum.payer.common.kafka.audit.interceptor.AuditProducerInterceptor"
"producer.audit.application.id": "HelloApplication",
"producer.audit.type": "test type"
我尝试在 kafka-connect 配置中添加这三个配置,我能够触发拦截器。但我想通过 JDBC 源连接器触发拦截器,以便我可以通过连接器传递自定义道具(application_id、类型等)。 请帮我解决这个问题
如果您在 Connect worker 中允许客户端覆盖(默认启用),您将需要使用 producer.override
前缀
来自文档
Starting with 2.3.0, client configuration overrides can be configured individually per connector by using the prefixes
producer.override.
andconsumer.override.
for Kafka sources or Kafka sinks respectively.