Kafka-Connect:在分布式模式下创建一个新的连接器就是创建一个新组

Kafka-Connect: Creating a new connector in distributed mode is creating new group

我目前正在使用 confluent 3.0.1 平台。我正在尝试在两个不同的工作人员上创建 2 个连接器,但尝试创建一个新的连接器正在为其创建一个新组。

Two connectors were created using below details:

1) POST http://devmetric.com:8083/connectors

{
    "name": "connector1",
    "config": {
        "connector.class": "com.xxx.kafka.connect.sink.DeliverySinkConnector",
        "tasks.max": "1",
        "topics": "dev.ps_primary_delivery",
        "elasticsearch.cluster.name": "ad_metrics_store",
        "elasticsearch.hosts": "devkafka1.com:9300",
        "elasticsearch.bulk.size": "100",
        "tenants": "tenant1"
    }
}

2) POST http://devkafka01.com:8083/connectors

{
    "name": "connector2",
    "config": {
        "connector.class": "com.xxx.kafka.connect.sink.DeliverySinkConnector",
        "tasks.max": "1",
        "topics": "dev.ps_primary_delivery",
        "elasticsearch.cluster.name": "ad_metrics_store",
        "elasticsearch.hosts": "devkafka.com:9300",
        "elasticsearch.bulk.size": "100",
        "tenants": "tenant1"
    }
}

但是他们都是在不同的组id下创建的。在此之后我查询了现有的组。

$ sh ./bin/kafka-consumer-groups --bootstrap-server devmetric.com:9091  --new-consumer  --list

Result was:
connect-connector2
connect-connector1

这些组是Kafka connect自动创建的,不是我给的。我在 worker.properties 中给出了不同的 group.id。但我希望两个连接器都在同一个组中,以便它们并行工作以共享 messages.As 现在我有 100 万个关于主题 "dev.ps_primary_delivery" 的数据,我希望两个连接器各获得 50 万个数据。

请告诉我该怎么做。

我认为需要进行一些说明...

  1. group.id 在 worker.properties 文件中没有引用消费者组。这是一个 "worker group" - 同一个工作组中的多个工作人员将在他们之间分配工作 - 所以如果同一个连接器有很多任务(例如 JDBC 连接器有一个任务用于每个 table ), 这些任务将分配给组中的所有工人。

  2. 接收器连接器确实有属于消费者组的消费者。该组的group.id始终是"connect-"+连接器名称。在你的例子中,你得到了 "connect-connector1" 和 "connect-connector2" 基于你的连接器名称。这也意味着两个连接器在同一组中的唯一方法是......如果它们具有相同的名称。但名称是唯一的,因此同一组中不能有两个连接器。原因是...

  3. 连接器本身并不真正获取事件,它们只是启动一堆任务。每个任务都有属于连接器消费者组的消费者,每个任务将独立处理主题和分区的子集。所以在同一个组中有两个连接器,基本上意味着他们的所有任务都属于同一组 - 那么为什么需要两个连接器?只需为该连接器配置更多主题和更多任务,一切就绪。

唯一的例外是您使用的连接器未正确使用任务或将您限制为只能执行一项任务。在那种情况下——要么他们有充分的理由,要么(更有可能)有人需要改进他们的连接器...

您可以将 consumer.group.id 设置为 Kafka Connect 可以采用的值,并将其用作整个应用程序的 group.id

优势:您的应用程序连接到一个消费者组 缺点:你应该小心 Consumer Group 配置。让它们看起来都一样