第二个和第三个分布式 Kafka 连接器工作人员无法正常工作

Second and Third Distributed Kafka Connector workers failing to work correctly

对于一个 3 的 Kafka 集群和一个相同的 Zookeeper 集群,我建立了一个分布式连接器节点。此节点运行 成功完成单个任务。然后我提出了第二个连接器,这似乎 运行 因为任务中的一些代码肯定 运行。然而,它似乎并没有保持活力(虽然没有抛出任何错误,但由于缺乏预期 activity 而观察到没有保持活力,而第一个连接器继续正常运行)。当我在每个连接器节点上调用 URL http://localhost:8083/connectors/mqtt/tasks 时,它告诉我连接器有一个任务。我希望这是两个任务,每个 node/worker 一个。 (目前工作人员配置说 tasks.max = 1 但我也尝试将其设置为 3.

当我尝试启动第三个连接器时,出现错误:

"POST /connectors HTTP/1.1" 500 90  5 
(org.apache.kafka.connect.runtime.rest.RestServer:60)

ERROR IO error forwarding REST request: 
(org.apache.kafka.connect.runtime.rest.RestServer:241) 
java.net.ConnectException: Connection refused

尝试再次从 shell returns 调用连接器 POST 方法错误:

 {"error_code":500,"message":"IO Error trying to forward REST request:
 Connection refused"}

我也尝试升级到今天发布的 Apache Kafka 0.10.1.1。我仍然看到问题。每个连接器 运行 都位于由单个图像定义的隔离 Docker 容器上。它们应该是相同的。

问题可能是我试图 运行 POST 对每个工作人员的 http://localhost:8083/connectors 请求,而我只需要 运行 它一次一个工人,然后该连接器的任务将自动分配给其他工人。如果是这种情况,我该如何分配任务?我目前将最大值设置为三个,但似乎只有一个 运行ning 在一个工人身上。

更新

我最终 运行ning 使用与 Yuri 建议的基本相同的方法得到了东西。我给每个工作人员一个唯一的组 ID,然后给每个连接器任务一个相同的名称。这允许三个连接器及其单个任务共享一个偏移量,因此在接收器连接器的情况下,它们从 Kafka 消费的消息不会重复。它们基本上 运行ning 作为独立的连接器,因为工作人员具有不同的组 ID,因此不会相互通信。

如果连接器工作人员具有相同的组 ID,则不能添加多个同名连接器。如果您为连接器指定不同的名称,它们将具有不同的偏移量并消耗重复的消息。如果您在同一组中有三个工作人员,一个连接器和三个任务,理论上您会有一个理想的情况,任务共享一个偏移量并且工作人员确保任务始终 运行ning 并且分布良好(每个任务消耗一组唯一的分区)。实际上,连接器框架不会创建多个任务,即使 tasks.max 设置为 3 并且当主题任务正在消耗时有 25 个分区。

如果有人知道我为什么会看到这种行为,请告诉我。

我遇到过与你相同情况的类似问题。

  1. Task.max 为主题配置,分布式工作人员自动决定哪些节点处理主题。因此,如果您的集群中有 3 个工作人员并且您的主题配置为 task.max=2,那么 3 个工作人员中只有 2 个将处理该主题。理论上,如果其中一名工人失败,第三名工人应该承担工作量。但是..
  2. 事实证明,分布式连接器非常不可靠:一旦 add\remove 某些节点,集群就会崩溃,所有工作人员除了尝试选择领导者外什么都不做,但失败了。解决的唯一方法是重新启动整个集群,最好同时重新启动所有工作人员。

我选择了另一种方式——我使用了独立的 worker,它对我来说很有魅力,因为负载分配是在 Kafka 客户端级别实现的,一旦某个 worker 掉线,集群会自动重新平衡,客户端会连接到未占用的主题.

PS。也许它对你也有用。 Confluent 连接器不能容忍与主题模式不匹配的无效负载。一旦连接器收到一些无效消息,它就会静静地死去。找出答案的唯一方法是分析指标。

我发布了一个旧问题的答案,因为 Kafka Connect has moved on a lot in three years

在最新版本 (2.3.1) 中,incremental rebalancing 极大地改进了 Kafka Connect 的行为。

还值得注意的是,在配置 Kafka Connect 时必须正确设置 rest.advertised.host.name,否则您会看到错误,包括引用的

{"error_code":500,"message":"IO Error trying to forward REST request: Connection refused"}

有关详细信息,请参阅 this post