如何修改kafka连接器?

How to modify kafka connector?

我在 docker 上使用终端上的 curl 创建了一个 debezium 连接器,但我一直在修改现有的连接器。

我的 docker 文件:

---
version: '3'
services:

  kafka-connect-02:
    image: confluentinc/cp-kafka-connect:latest
    container_name: kafka-connect-02
    ports:
      - 8083:8083
    environment:
      CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
      CONNECT_CUB_KAFKA_TIMEOUT: 300
      CONNECT_BOOTSTRAP_SERVERS: "https://***9092"
      CONNECT_REST_ADVERTISED_HOST_NAME: 'kafka-connect-02'
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: kafka-connect-group-01-v04
      CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-group-01-v04-configs
      CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-group-01-v04-offsets
      CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-group-01-v04-status
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "https://***9092"
      CONNECT_KEY_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: "USER_INFO"
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: "***:***"
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "https://***9092"
      CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: "USER_INFO"
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: "***:***"
      CONNECT_INTERNAL_KEY_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
      CONNECT_INTERNAL_VALUE_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
      CONNECT_LOG4J_ROOT_LOGLEVEL: 'INFO'
      CONNECT_LOG4J_LOGGERS: 'org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR'
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '3'
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '3'
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '3'
      CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components/'
      # Confluent Cloud config
      CONNECT_REQUEST_TIMEOUT_MS: "20000"
      CONNECT_RETRY_BACKOFF_MS: "500"
      CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
      CONNECT_SASL_MECHANISM: "PLAIN"
      CONNECT_SECURITY_PROTOCOL: "SASL_SSL"
      CONNECT_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"***\" password=\"**";"
      #
      CONNECT_CONSUMER_SECURITY_PROTOCOL: "SASL_SSL"
      CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
      CONNECT_CONSUMER_SASL_MECHANISM: "PLAIN"
      CONNECT_CONSUMER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"***\" password=\"**";"
      CONNECT_CONSUMER_REQUEST_TIMEOUT_MS: "20000"
      CONNECT_CONSUMER_RETRY_BACKOFF_MS: "500"
      #
      CONNECT_PRODUCER_SECURITY_PROTOCOL: "SASL_SSL"
      CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
      CONNECT_PRODUCER_SASL_MECHANISM: "PLAIN"
      CONNECT_PRODUCER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"***\" password=\"**";"
      CONNECT_PRODUCER_REQUEST_TIMEOUT_MS: "20000"
      CONNECT_PRODUCER_RETRY_BACKOFF_MS: "500"
      # External secrets config
      # See https://docs.confluent.io/current/connect/security.html#externalizing-secrets
      CONNECT_CONFIG_PROVIDERS: 'file'
      CONNECT_CONFIG_PROVIDERS_FILE_CLASS: 'org.apache.kafka.common.config.provider.FileConfigProvider'
    command: 
      - bash 
      - -c 
      - |
        echo "Installing connector plugins"
        confluent-hub install --no-prompt debezium/debezium-connector-sqlserver:0.10.0
        confluent-hub install --no-prompt snowflakeinc/snowflake-kafka-connector:0.5.5
        #
        echo "Launching Kafka Connect worker"
        /etc/confluent/docker/run &  

        #
        sleep infinity

我的 debezium 连接器:

curl -i -X PUT -H "Content-Type:application/json" http://localhost:8083/connectors/Procura_CDC/config  -d '{  "connector.class":"io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max":"1",
"database.server.name":"***",
"database.hostname":"***",
"database.port":"***",
"database.user":"Kafka",
"database.password":"***",
"database.dbname":"Procura_Prod",
"database.history.kafka.bootstrap.servers":"*****",
"database.history.kafka.topic":"dbhistory.procura",
"table.whitelist":"dbo.CLIENTS,dbo.VISITS",
"poll.interval.ms":"2000",
"snapshot.fetch.size":"2000",
"snapshot.mode":"initial",
"snapshot.isolation.mode":"snapshot",
"transforms":"unwrap,dropPrefix",
"transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones":"false",
"transforms.unwrap.delete.handling.mode":"rewrite",
"transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropPrefix.regex":"procura.dbo.(.*)",
"transforms.dropPrefix.replacement":"" }'

我在修改已使用上述代码创建的 debezium 连接器时遇到错误。使用 PUTPOST 方法也不起作用。报错"curl: (7) failed to connect to localhost port 8083: Connection refused" or "curl: (56) Recv failure: connection reset by peer." or "error_code":500,"message":"Request timed out" 不知道怎么修改。 docker 的新手,如有任何帮助,我们将不胜感激。谢谢

您可以使用 PUT 创建和更新连接器配置。这是一个例子:

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/source-file-01/config \
    -d '{
    "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "file": "/tmp/totail.txt",
    "topic": "foo",
    "tasks.max": 6
}'    

这将创建(或修改)名为 source-file-01 的连接器。如果你想改变它的配置,你可以在改变必要的值的同时重新发出 PUT

这种重新运行命令的能力是我在创建连接器时总是更喜欢PUT而不是POST的原因,因为你不需要改变你[=22]的方式=] 它基于连接器是否已经存在。