Kafka Connect 分布式模式组协调器不可用

Kafka Connect Distributed mode The group coordinator is not available

我已经尝试了两个星期了,我 运行 Kafka 集群在不同的机器上而不是我的连接节点。我无法正确连接 运行。我可以毫无问题地读写kafka。 Zookeeper 似乎 运行 没问题。

我启动连接:

$ bin/connect-distributed connect-distributed.properties

Connect 不断循环显示此错误:

[2018-08-21 15:45:12,161] INFO [Worker clientId=c1, groupId=connect-cluster] Discovered group coordinator 172.25.1.2:9092 (id: 2147483645 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:677)
[2018-08-21 15:45:12,163] INFO [Worker clientId=c1, groupId=connect-cluster] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:509)
[2018-08-21 15:45:12,165] INFO [Worker clientId=c1, groupId=connect-cluster] Group coordinator 172.25.1.2:9092 (id: 2147483645 rack: null) is unavailable or invalid, will attempt rediscovery (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:729)
[2018-08-21 15:45:12,266] INFO [Worker clientId=c1, groupId=connect-cluster] Discovered group coordinator 172.25.40.219:9092 (id: 2147483645 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:677)
[2018-08-21 15:45:12,267] INFO [Worker clientId=c1, groupId=connect-cluster] Group coordinator 172.25.1.2:9092 (id: 2147483645 rack: null) is unavailable or invalid, will attempt rediscovery (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:729)

这是我的连接-distributed.properties 的样子:

bootstrap.servers=172.25.1.2:9092,172.25.1.3:9092,172.25.1.4:9092
group.id=connect-cluster

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=3

config.storage.topic=connect-configs
config.storage.replication.factor=3

status.storage.topic=connect-status
status.storage.replication.factor=3
status.storage.partitions=3

offset.flush.interval.ms=10000

rest.host.name=172.25.1.5
rest.port=8083

heartbeat.interval.ms=3000
session.timeout.ms=30000
security.protocol=PLAINTEXT
client.id=c1

plugin.path=/usr/share/java

__Consumer_offsets 主题如下:

/opt/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic __consumer_offsets                                       
Topic:__consumer_offsets        PartitionCount:50       ReplicationFactor:3     Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
    Topic: __consumer_offsets       Partition: 0    Leader: 2       Replicas: 1,2,3 Isr: 3,2
    Topic: __consumer_offsets       Partition: 1    Leader: 2       Replicas: 1,2,3 Isr: 3,2
    Topic: __consumer_offsets       Partition: 2    Leader: 2       Replicas: 1,2,3 Isr: 3,2
    Topic: __consumer_offsets       Partition: 3    Leader: 1       Replicas: 1     Isr: 1
    Topic: __consumer_offsets       Partition: 4    Leader: 2       Replicas: 2     Isr: 2.... etc

在用 Go 编写连接器后,我遇到了同样的问题。我被迫自己解决它。

当连接器连接到 kafka 时,它会自动写入主题和 __offset_topics。当连接器崩溃时,它会在那些 table 中留下自己作为协调器的痕迹。当一个新的连接器启动时,它会在 table 中找到记录并尝试与协调器通信。协调器无法响应,连接器永远无法工作。

您可以通过以下两种方式解决此问题:删除所有主题(connect-configsconnect-offsetsconnect-status__offset_topics)并重新启动集群。另一种方法是从主题中删除协调器,我目前不确定如何执行。

将所有 kafka 代理的主机名添加到您的 /etc/hosts 文件,然后重试

发帖给可能觉得有用的其他人。

我遇到了同样的问题...重新启动 kafka 解决了我的问题。

执行后:

service kafka status

在不到 10 秒的时间内使我的日志正常:

2019-11-08 14:30:19.781  INFO [-,,,] 1 --- [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=datasources-ca-contacts] Discovered group coordinator myserver:9092 (id: 2147483647 rack: null)

当我 运行 我的代理磁盘 space 不足时,我得到了这个错误。可能值得检查。