如何修改kafka连接器?
How to modify kafka connector?
我在 docker 上使用终端上的 curl 创建了一个 debezium 连接器,但我一直在修改现有的连接器。
我的 docker 文件:
---
version: '3'
services:
kafka-connect-02:
image: confluentinc/cp-kafka-connect:latest
container_name: kafka-connect-02
ports:
- 8083:8083
environment:
CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
CONNECT_CUB_KAFKA_TIMEOUT: 300
CONNECT_BOOTSTRAP_SERVERS: "https://***9092"
CONNECT_REST_ADVERTISED_HOST_NAME: 'kafka-connect-02'
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: kafka-connect-group-01-v04
CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-group-01-v04-configs
CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-group-01-v04-offsets
CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-group-01-v04-status
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "https://***9092"
CONNECT_KEY_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: "USER_INFO"
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: "***:***"
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "https://***9092"
CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: "USER_INFO"
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: "***:***"
CONNECT_INTERNAL_KEY_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
CONNECT_INTERNAL_VALUE_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
CONNECT_LOG4J_ROOT_LOGLEVEL: 'INFO'
CONNECT_LOG4J_LOGGERS: 'org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR'
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '3'
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '3'
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '3'
CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components/'
# Confluent Cloud config
CONNECT_REQUEST_TIMEOUT_MS: "20000"
CONNECT_RETRY_BACKOFF_MS: "500"
CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
CONNECT_SASL_MECHANISM: "PLAIN"
CONNECT_SECURITY_PROTOCOL: "SASL_SSL"
CONNECT_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"***\" password=\"**";"
#
CONNECT_CONSUMER_SECURITY_PROTOCOL: "SASL_SSL"
CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
CONNECT_CONSUMER_SASL_MECHANISM: "PLAIN"
CONNECT_CONSUMER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"***\" password=\"**";"
CONNECT_CONSUMER_REQUEST_TIMEOUT_MS: "20000"
CONNECT_CONSUMER_RETRY_BACKOFF_MS: "500"
#
CONNECT_PRODUCER_SECURITY_PROTOCOL: "SASL_SSL"
CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
CONNECT_PRODUCER_SASL_MECHANISM: "PLAIN"
CONNECT_PRODUCER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"***\" password=\"**";"
CONNECT_PRODUCER_REQUEST_TIMEOUT_MS: "20000"
CONNECT_PRODUCER_RETRY_BACKOFF_MS: "500"
# External secrets config
# See https://docs.confluent.io/current/connect/security.html#externalizing-secrets
CONNECT_CONFIG_PROVIDERS: 'file'
CONNECT_CONFIG_PROVIDERS_FILE_CLASS: 'org.apache.kafka.common.config.provider.FileConfigProvider'
command:
- bash
- -c
- |
echo "Installing connector plugins"
confluent-hub install --no-prompt debezium/debezium-connector-sqlserver:0.10.0
confluent-hub install --no-prompt snowflakeinc/snowflake-kafka-connector:0.5.5
#
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run &
#
sleep infinity
我的 debezium 连接器:
curl -i -X PUT -H "Content-Type:application/json" http://localhost:8083/connectors/Procura_CDC/config -d '{ "connector.class":"io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max":"1",
"database.server.name":"***",
"database.hostname":"***",
"database.port":"***",
"database.user":"Kafka",
"database.password":"***",
"database.dbname":"Procura_Prod",
"database.history.kafka.bootstrap.servers":"*****",
"database.history.kafka.topic":"dbhistory.procura",
"table.whitelist":"dbo.CLIENTS,dbo.VISITS",
"poll.interval.ms":"2000",
"snapshot.fetch.size":"2000",
"snapshot.mode":"initial",
"snapshot.isolation.mode":"snapshot",
"transforms":"unwrap,dropPrefix",
"transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones":"false",
"transforms.unwrap.delete.handling.mode":"rewrite",
"transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropPrefix.regex":"procura.dbo.(.*)",
"transforms.dropPrefix.replacement":"" }'
我在修改已使用上述代码创建的 debezium 连接器时遇到错误。使用 PUT
或 POST
方法也不起作用。报错"curl: (7) failed to connect to localhost port 8083: Connection refused"
or "curl: (56) Recv failure: connection reset by peer."
or "error_code":500,"message":"Request timed out"
不知道怎么修改。 docker 的新手,如有任何帮助,我们将不胜感激。谢谢
您可以使用 PUT
创建和更新连接器配置。这是一个例子:
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-file-01/config \
-d '{
"connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"file": "/tmp/totail.txt",
"topic": "foo",
"tasks.max": 6
}'
这将创建(或修改)名为 source-file-01
的连接器。如果你想改变它的配置,你可以在改变必要的值的同时重新发出 PUT
。
这种重新运行命令的能力是我在创建连接器时总是更喜欢PUT
而不是POST
的原因,因为你不需要改变你[=22]的方式=] 它基于连接器是否已经存在。
我在 docker 上使用终端上的 curl 创建了一个 debezium 连接器,但我一直在修改现有的连接器。
我的 docker 文件:
---
version: '3'
services:
kafka-connect-02:
image: confluentinc/cp-kafka-connect:latest
container_name: kafka-connect-02
ports:
- 8083:8083
environment:
CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
CONNECT_CUB_KAFKA_TIMEOUT: 300
CONNECT_BOOTSTRAP_SERVERS: "https://***9092"
CONNECT_REST_ADVERTISED_HOST_NAME: 'kafka-connect-02'
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: kafka-connect-group-01-v04
CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-group-01-v04-configs
CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-group-01-v04-offsets
CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-group-01-v04-status
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "https://***9092"
CONNECT_KEY_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: "USER_INFO"
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: "***:***"
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "https://***9092"
CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: "USER_INFO"
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: "***:***"
CONNECT_INTERNAL_KEY_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
CONNECT_INTERNAL_VALUE_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
CONNECT_LOG4J_ROOT_LOGLEVEL: 'INFO'
CONNECT_LOG4J_LOGGERS: 'org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR'
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '3'
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '3'
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '3'
CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components/'
# Confluent Cloud config
CONNECT_REQUEST_TIMEOUT_MS: "20000"
CONNECT_RETRY_BACKOFF_MS: "500"
CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
CONNECT_SASL_MECHANISM: "PLAIN"
CONNECT_SECURITY_PROTOCOL: "SASL_SSL"
CONNECT_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"***\" password=\"**";"
#
CONNECT_CONSUMER_SECURITY_PROTOCOL: "SASL_SSL"
CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
CONNECT_CONSUMER_SASL_MECHANISM: "PLAIN"
CONNECT_CONSUMER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"***\" password=\"**";"
CONNECT_CONSUMER_REQUEST_TIMEOUT_MS: "20000"
CONNECT_CONSUMER_RETRY_BACKOFF_MS: "500"
#
CONNECT_PRODUCER_SECURITY_PROTOCOL: "SASL_SSL"
CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
CONNECT_PRODUCER_SASL_MECHANISM: "PLAIN"
CONNECT_PRODUCER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"***\" password=\"**";"
CONNECT_PRODUCER_REQUEST_TIMEOUT_MS: "20000"
CONNECT_PRODUCER_RETRY_BACKOFF_MS: "500"
# External secrets config
# See https://docs.confluent.io/current/connect/security.html#externalizing-secrets
CONNECT_CONFIG_PROVIDERS: 'file'
CONNECT_CONFIG_PROVIDERS_FILE_CLASS: 'org.apache.kafka.common.config.provider.FileConfigProvider'
command:
- bash
- -c
- |
echo "Installing connector plugins"
confluent-hub install --no-prompt debezium/debezium-connector-sqlserver:0.10.0
confluent-hub install --no-prompt snowflakeinc/snowflake-kafka-connector:0.5.5
#
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run &
#
sleep infinity
我的 debezium 连接器:
curl -i -X PUT -H "Content-Type:application/json" http://localhost:8083/connectors/Procura_CDC/config -d '{ "connector.class":"io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max":"1",
"database.server.name":"***",
"database.hostname":"***",
"database.port":"***",
"database.user":"Kafka",
"database.password":"***",
"database.dbname":"Procura_Prod",
"database.history.kafka.bootstrap.servers":"*****",
"database.history.kafka.topic":"dbhistory.procura",
"table.whitelist":"dbo.CLIENTS,dbo.VISITS",
"poll.interval.ms":"2000",
"snapshot.fetch.size":"2000",
"snapshot.mode":"initial",
"snapshot.isolation.mode":"snapshot",
"transforms":"unwrap,dropPrefix",
"transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones":"false",
"transforms.unwrap.delete.handling.mode":"rewrite",
"transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropPrefix.regex":"procura.dbo.(.*)",
"transforms.dropPrefix.replacement":"" }'
我在修改已使用上述代码创建的 debezium 连接器时遇到错误。使用 PUT
或 POST
方法也不起作用。报错"curl: (7) failed to connect to localhost port 8083: Connection refused"
or "curl: (56) Recv failure: connection reset by peer."
or "error_code":500,"message":"Request timed out"
不知道怎么修改。 docker 的新手,如有任何帮助,我们将不胜感激。谢谢
您可以使用 PUT
创建和更新连接器配置。这是一个例子:
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-file-01/config \
-d '{
"connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"file": "/tmp/totail.txt",
"topic": "foo",
"tasks.max": 6
}'
这将创建(或修改)名为 source-file-01
的连接器。如果你想改变它的配置,你可以在改变必要的值的同时重新发出 PUT
。
这种重新运行命令的能力是我在创建连接器时总是更喜欢PUT
而不是POST
的原因,因为你不需要改变你[=22]的方式=] 它基于连接器是否已经存在。