部署 ElasticsearchSinkConnector 后任务为空

Tasks are empty after deploying ElasticsearchSinkConnector

我尝试通过

部署ElasticsearchSinkConnector

POST/连接器

{
    "name": "elasticsearch-sink",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "type.name": "_doc",
        "tasks.max": "1",
        "topics": "my_db_server.public.my_table",
        "connection.url": "https://my-elasticsearch.com:9200",
        "connection.username": "xxx",
        "connection.password": "xxx",
        "key.ignore": "true",
        "schema.ignore": "true",
        "elastic.security.protocol": "SSL",
        "elastic.https.ssl.keystore.location": "path/to/keystore.jks",
        "elastic.https.ssl.keystore.password": "xxx",
        "elastic.https.ssl.key.password": "xxx",
        "elastic.https.ssl.keystore.type": "JKS",
        "elastic.https.ssl.truststore.location": "path/to/truststore.jks",
        "elastic.https.ssl.truststore.password": "xxx",
        "elastic.https.ssl.truststore.type": "JKS",
        "elastic.https.ssl.protocol": "TLS"
    }
}

它已成功部署,但是,当我通过

检查状态时

获取/connectors/elasticsearch-sink/status

tasks 为空数组 []:

{
    "name": "elasticsearch-sink",
    "connector": {
        "state": "RUNNING",
        "worker_id": "10.xxx.xxx.xxx:8083"
    },
    "tasks": [],
    "type": "sink"
}

我找到了这个Kafka Connect: No tasks created for a connector

但是,我在内部尝试了这两个答案,更改名称和删除 ElasticsearchSinkConnector 然后重新部署多次对我来说都不起作用。

此外,Kafka Connect pod 中没有日志。

有什么想法吗?谢谢!

将这两个添加到 ElasticsearchSinkConnector 配置后

        "errors.log.include.messages": "true",
        "errors.log.enable": "true"

在配置中这样

POST/连接器

{
    "name": "elasticsearch-sink",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "type.name": "_doc",
        "tasks.max": "1",
        "topics": "my_db_server.public.my_table",
        "connection.url": "https://my-elasticsearch.com:9200",
        "connection.username": "xxx",
        "connection.password": "xxx",
        "key.ignore": "true",
        "schema.ignore": "true",
        "elastic.security.protocol": "SSL",
        "elastic.https.ssl.keystore.location": "path/to/keystore.jks",
        "elastic.https.ssl.keystore.password": "xxx",
        "elastic.https.ssl.key.password": "xxx",
        "elastic.https.ssl.keystore.type": "JKS",
        "elastic.https.ssl.truststore.location": "path/to/truststore.jks",
        "elastic.https.ssl.truststore.password": "xxx",
        "elastic.https.ssl.truststore.type": "JKS",
        "elastic.https.ssl.protocol": "TLS",
        "errors.log.include.messages": "true",
        "errors.log.enable": "true"
    }
}

这次我通过

查看状态

获取/connectors/elasticsearch-sink/status

tasks 现在显示错误:

{
    "name": "elasticsearch-sink",
    "connector": {
        "state": "RUNNING",
        "worker_id": "10.xxx.xxx.xxx:8083"
    },
    "tasks": [
        {
            "id": 0,
            "state": "FAILED",
            "worker_id": "10.xxx.xxx.xxx:8083",
            "trace": "org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: connect-elasticsearch-sink\n"
        }
    ],
    "type": "sink"
}

现在只需要解决权限问题。

此外,我找到了一篇关于如何调试的好文章:https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/