如何将主题从一个经纪人转移到卡夫卡的另一个经纪人?

How to move a topic from one broker to another broker in kafka?

我首先尝试查看是否可以在特定代理中创建主题。但看起来这是不可能的。即使我在 bootstrap

中提到经纪人主机
    admin_client = AdminClient({
    "bootstrap.servers": "xxx1.com:9092,xxx2.com:9092"
})

futmap=admin_client.create_topics(topic_list)

该程序任意选择我拥有的 5 个经纪人之一作为该主题的领导经纪人。我试图理解为什么会这样。

我也在尝试看看是否可以将主题负责人重新分配给另一个经纪人。我知道可以通过 kafka-reassign-partitions 命令行脚本实现,但我想使用 python 和 confluent-Kafka 包以编程方式进行。是否可以以编程方式执行此操作。在confluent-Kafka package

的ADMINclass中没有找到reassign partition function

谢谢

我终于找到了解决方案,汇合的 Kafka python 包的文档不足以解决这个问题。但是开源的一个好处是你可以阅读代码并弄明白。因此,要在特定代理中创建主题,我必须编写如下创建主题代码。请注意,我使用 replica_assignment 而不是 replication_factor。这两者是相互排斥的。如果你使用replication_factor,分区将由Kafka分配,你可以通过replica_assignment控制分配。但是,我确信在 rebalancing/re-assigning 个分区的情况下,这将被重新分配。但这也可以通过 on_revoke 事件来处理。但就目前而言,这对我有用。

def createTopic(admin_client,topics):
    #topic_name=topics
    topic_name = ['rajib1_test_xxx_topic']
    replica_assignment = [[262, 261]]
    topic_list = [NewTopic(topic, num_partitions=1, replica_assignment=replica_assignment) for topic in topic_name]
futmap=admin_client.create_topics(topic_list)
# Wait for each operation to finish.
for topic, f in futmap.items():
    try:
        f.result()  # The result itself is None
        print("Topic {} created".format(topic))
    except Exception as e:
        print("Failed to create topic {}: {}".format(topic, e))
#return futmap

您还可以使用 Kafka 附带的 kafka-reassign-partitions.sh 工具将一个主题的副本更改为另一个代理。

例如,如果你想让你的(在这个例子中是单一复制和单一分区的)主题“test”位于代理“1”上,你可以先定义一个计划(名为replicachange.json):

{
"partitions":
  [{"topic": "test", "partition": 0,
    "replicas": [
       1
    ]
  }],
"version":1
}

然后使用:

执行它
kafka-reassign-partitions.sh --zookeeper localhost:2181 --execute \
--reassignment-json-file replicachange.json