如何将主题从一个经纪人转移到卡夫卡的另一个经纪人?
How to move a topic from one broker to another broker in kafka?
我首先尝试查看是否可以在特定代理中创建主题。但看起来这是不可能的。即使我在 bootstrap
中提到经纪人主机
admin_client = AdminClient({
"bootstrap.servers": "xxx1.com:9092,xxx2.com:9092"
})
futmap=admin_client.create_topics(topic_list)
该程序任意选择我拥有的 5 个经纪人之一作为该主题的领导经纪人。我试图理解为什么会这样。
我也在尝试看看是否可以将主题负责人重新分配给另一个经纪人。我知道可以通过 kafka-reassign-partitions 命令行脚本实现,但我想使用 python 和 confluent-Kafka 包以编程方式进行。是否可以以编程方式执行此操作。在confluent-Kafka package
的ADMINclass中没有找到reassign partition function
谢谢
我终于找到了解决方案,汇合的 Kafka python 包的文档不足以解决这个问题。但是开源的一个好处是你可以阅读代码并弄明白。因此,要在特定代理中创建主题,我必须编写如下创建主题代码。请注意,我使用 replica_assignment 而不是 replication_factor。这两者是相互排斥的。如果你使用replication_factor,分区将由Kafka分配,你可以通过replica_assignment控制分配。但是,我确信在 rebalancing/re-assigning 个分区的情况下,这将被重新分配。但这也可以通过 on_revoke 事件来处理。但就目前而言,这对我有用。
def createTopic(admin_client,topics):
#topic_name=topics
topic_name = ['rajib1_test_xxx_topic']
replica_assignment = [[262, 261]]
topic_list = [NewTopic(topic, num_partitions=1, replica_assignment=replica_assignment) for topic in topic_name]
futmap=admin_client.create_topics(topic_list)
# Wait for each operation to finish.
for topic, f in futmap.items():
try:
f.result() # The result itself is None
print("Topic {} created".format(topic))
except Exception as e:
print("Failed to create topic {}: {}".format(topic, e))
#return futmap
您还可以使用 Kafka 附带的 kafka-reassign-partitions.sh
工具将一个主题的副本更改为另一个代理。
例如,如果你想让你的(在这个例子中是单一复制和单一分区的)主题“test”位于代理“1”上,你可以先定义一个计划(名为replicachange.json):
{
"partitions":
[{"topic": "test", "partition": 0,
"replicas": [
1
]
}],
"version":1
}
然后使用:
执行它
kafka-reassign-partitions.sh --zookeeper localhost:2181 --execute \
--reassignment-json-file replicachange.json
我首先尝试查看是否可以在特定代理中创建主题。但看起来这是不可能的。即使我在 bootstrap
中提到经纪人主机 admin_client = AdminClient({
"bootstrap.servers": "xxx1.com:9092,xxx2.com:9092"
})
futmap=admin_client.create_topics(topic_list)
该程序任意选择我拥有的 5 个经纪人之一作为该主题的领导经纪人。我试图理解为什么会这样。
我也在尝试看看是否可以将主题负责人重新分配给另一个经纪人。我知道可以通过 kafka-reassign-partitions 命令行脚本实现,但我想使用 python 和 confluent-Kafka 包以编程方式进行。是否可以以编程方式执行此操作。在confluent-Kafka package
的ADMINclass中没有找到reassign partition function谢谢
我终于找到了解决方案,汇合的 Kafka python 包的文档不足以解决这个问题。但是开源的一个好处是你可以阅读代码并弄明白。因此,要在特定代理中创建主题,我必须编写如下创建主题代码。请注意,我使用 replica_assignment 而不是 replication_factor。这两者是相互排斥的。如果你使用replication_factor,分区将由Kafka分配,你可以通过replica_assignment控制分配。但是,我确信在 rebalancing/re-assigning 个分区的情况下,这将被重新分配。但这也可以通过 on_revoke 事件来处理。但就目前而言,这对我有用。
def createTopic(admin_client,topics):
#topic_name=topics
topic_name = ['rajib1_test_xxx_topic']
replica_assignment = [[262, 261]]
topic_list = [NewTopic(topic, num_partitions=1, replica_assignment=replica_assignment) for topic in topic_name]
futmap=admin_client.create_topics(topic_list)
# Wait for each operation to finish.
for topic, f in futmap.items():
try:
f.result() # The result itself is None
print("Topic {} created".format(topic))
except Exception as e:
print("Failed to create topic {}: {}".format(topic, e))
#return futmap
您还可以使用 Kafka 附带的 kafka-reassign-partitions.sh
工具将一个主题的副本更改为另一个代理。
例如,如果你想让你的(在这个例子中是单一复制和单一分区的)主题“test”位于代理“1”上,你可以先定义一个计划(名为replicachange.json):
{
"partitions":
[{"topic": "test", "partition": 0,
"replicas": [
1
]
}],
"version":1
}
然后使用:
执行它kafka-reassign-partitions.sh --zookeeper localhost:2181 --execute \
--reassignment-json-file replicachange.json