Kafka 连接任务随机死于 NPE

Kafka connect tasks die with NPE randomly

我启动了 运行 一个自定义的 kafka connect 插件,它将数据从 kafka 转储到 ElasticSearch,并且我的任务会定期无缘无故地死掉(几小时到一天后)

我在连接状态中看到了这个:

curl -s http://localhost:8083/connectors/my-custom-sink/status | python -m json.tool
{
    "connector": {
        "state": "RUNNING",
        "worker_id": "MASKED_IP1:8083"
    },
    "name": "my-custom-sink",
    "tasks": [
        {
            "id": 0,
            "state": "FAILED",
            "trace": "java.lang.NullPointerException\n\tat org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:52)\n\tat org.apache.kafka.connect.runtime.TaskConfig.<init>(TaskConfig.java:52)\n\tat org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:313)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:834)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.access00(DistributedHerder.java:101)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.call(DistributedHerder.java:848)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.call(DistributedHerder.java:844)\n\tat java.util.concurrent.FutureTask.run(Unknown Source)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)\n\tat java.lang.Thread.run(Unknown Source)\n",
            "worker_id": "MASKED_IP1:8083"
        },
        {
            "id": 1,
            "state": "FAILED",
            "trace": "java.lang.NullPointerException\n\tat org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:52)\n\tat org.apache.kafka.connect.runtime.TaskConfig.<init>(TaskConfig.java:52)\n\tat org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:313)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:834)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.access00(DistributedHerder.java:101)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.call(DistributedHerder.java:848)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.call(DistributedHerder.java:844)\n\tat java.util.concurrent.FutureTask.run(Unknown Source)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)\n\tat java.lang.Thread.run(Unknown Source)\n",
            "worker_id": "MASKED_IP2:8083"
        },
        {
            "id": 2,
            "state": "RUNNING",
            "worker_id": "MASKED_IP3:8083"
        }
    ]
}

任何见解将不胜感激(即 kafka 在抛出 npe 时试图做什么)

更新: 这是卡夫卡 0.10.2.0

这是我在工人死前在日志中看到的内容

INFO  2017-07-30 07:38:17,146  [DistributedHerder] org.apache.kafka.common.utils.AppInfoParser: Kafka version : 0.10.2.0
INFO  2017-07-30 07:38:17,146  [DistributedHerder] org.apache.kafka.common.utils.AppInfoParser: Kafka commitId : 576d93a8dc0cf421
INFO  2017-07-30 07:38:17,172  [DistributedHerder] org.apache.kafka.clients.consumer.internals.AbstractCoordinator: Discovered coordinator MASKED_FQDN:9092 (id: 2147483643 rack: null) for group my-group.
INFO  2017-07-30 07:38:17,223  [DistributedHerder] org.apache.kafka.connect.storage.KafkaConfigBackingStore: Removed connector my-group due to null configuration. This is usually intentional and does not indicate an issue.
INFO  2017-07-30 07:38:17,223  [DistributedHerder] org.apache.kafka.connect.storage.KafkaConfigBackingStore: Removed connector my-group due to null configuration. This is usually intentional and does not indicate an issue.
INFO  2017-07-30 07:38:17,223  [DistributedHerder] org.apache.kafka.connect.storage.KafkaConfigBackingStore: Removed connector my-group due to null configuration. This is usually intentional and does not indicate an issue.
INFO  2017-07-30 07:38:17,224  [DistributedHerder] org.apache.kafka.connect.storage.KafkaConfigBackingStore: Removed connector my-group due to null configuration. This is usually intentional and does not indicate an issue.
INFO  2017-07-30 07:38:17,224  [DistributedHerder] org.apache.kafka.connect.storage.KafkaConfigBackingStore: Removed connector my-group due to null configuration. This is usually intentional and does not indicate an issue.
INFO  2017-07-30 07:38:17,226  [DistributedHerder] org.apache.kafka.connect.storage.KafkaConfigBackingStore: Removed connector my-group due to null configuration. This is usually intentional and does not indicate an issue.
INFO  2017-07-30 07:38:17,228  [DistributedHerder] org.apache.kafka.connect.util.KafkaBasedLog: Finished reading KafkaBasedLog for topic connect-configs
INFO  2017-07-30 07:38:17,228  [DistributedHerder] org.apache.kafka.connect.util.KafkaBasedLog: Started KafkaBasedLog for topic connect-configs
INFO  2017-07-30 07:38:17,228  [DistributedHerder] org.apache.kafka.connect.storage.KafkaConfigBackingStore: Started KafkaConfigBackingStore
INFO  2017-07-30 07:38:17,233  [DistributedHerder] org.apache.kafka.connect.runtime.distributed.DistributedHerder: Herder started
INFO  2017-07-30 07:38:17,239  [DistributedHerder] org.apache.kafka.clients.consumer.internals.AbstractCoordinator: Discovered coordinator MASKED_FQDN1:9092 (id: 2147483643 rack: null) for group my-group.
INFO  2017-07-30 07:38:17,241  [DistributedHerder] org.apache.kafka.clients.consumer.internals.AbstractCoordinator: (Re-)joining group my-group
INFO  2017-07-30 07:38:20,868  [CLASSPATH traversal thread.] org.reflections.Reflections: Reflections took 3600 ms to scan 1 urls, producing 7612 keys and 36389 values 
INFO  2017-07-30 07:38:22,511  [DistributedHerder] org.apache.kafka.clients.consumer.internals.AbstractCoordinator: Successfully joined group my-group with generation 804011
INFO  2017-07-30 07:38:22,516  [DistributedHerder] org.apache.kafka.connect.runtime.distributed.DistributedHerder: Joined group and got assignment: Assignment{error=0, leader='connect-1-29930623-f17a-43ca-bed9-7f8a6fb5317a', leaderUrl='http://MASKED_IP:8083/', offset=2, connectorIds=[], taskIds=[my-group]}
WARN  2017-07-30 07:38:22,521  [DistributedHerder] org.apache.kafka.connect.runtime.distributed.DistributedHerder: Catching up to assignment's config offset.
INFO  2017-07-30 07:38:22,521  [DistributedHerder] org.apache.kafka.connect.runtime.distributed.DistributedHerder: Current config state offset -1 is behind group assignment 2, reading to end of config log
INFO  2017-07-30 07:38:22,846  [DistributedHerder] org.apache.kafka.connect.runtime.distributed.DistributedHerder: Finished reading to end of log and updated config snapshot, new config log offset: 2
INFO  2017-07-30 07:38:22,846  [DistributedHerder] org.apache.kafka.connect.runtime.distributed.DistributedHerder: Starting connectors and tasks using config offset 2
INFO  2017-07-30 07:38:22,849  [pool-6-thread-1] org.apache.kafka.connect.runtime.distributed.DistributedHerder: Starting task my-group
INFO  2017-07-30 07:38:22,849  [pool-6-thread-1] org.apache.kafka.connect.runtime.Worker: Creating task my-group
INFO  2017-07-30 07:38:22,851  [pool-6-thread-1] org.apache.kafka.connect.runtime.ConnectorConfig: ConnectorConfig values: 
        connector.class = IndexerSinkConnector
        key.converter = null
        name = my-group
        tasks.max = 3
        transforms = null
        value.converter = null

ERROR 2017-07-30 07:38:22,856  [pool-6-thread-1] org.apache.kafka.connect.runtime.Worker: Failed to start task my-group
java.lang.NullPointerException: null
        at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:52)
        at org.apache.kafka.connect.runtime.TaskConfig.<init>(TaskConfig.java:52)
        at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:313)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:834)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access00(DistributedHerder.java:101)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.call(DistributedHerder.java:848)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.call(DistributedHerder.java:844)
        at java.util.concurrent.FutureTask.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)
INFO  2017-07-30 07:38:22,890  [DistributedHerder] org.apache.kafka.connect.runtime.distributed.DistributedHerder: Finished starting connectors and tasks

NPE 起源于此https://github.com/apache/kafka/blob/0.10.2.0/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java#L52您的配置主题中是否有多个分区?配置主题只允许一个分区。此要求从 0.11 开始严格执行,因此未来版本不应遇到此问题。