Apache Kafka 不平衡集群逻辑
Apache Kafka Unbalanced Cluster Logic
我们有一个要求,我们需要通过将 3 个代理集群的流量分配分配给 Kafka 集群,例如 80% 到代理 1 15% 到代理 2 5% 到代理 3 并发送消息根据经纪人流量分配给经纪人的主题。
为了使用 kafka-python 在 python 编程中实现此逻辑,我们从主函数中调用生成不平衡消息函数。下面提供了实现逻辑的代码示例:-
主要功能
def mf():
.
.
.
# create a topic if the topic doesn't exists. Tps_crtn will create new topic if no existing topics found else, will send messages to the existing topics, as usual.
tpc_list = tps_crtn(base_topic_name=bt, no_of_topics=int(ntp),
topic_partn=int(ptp),
repicas_per_partn=int(rpp))
#traffic distribution list
dl = [80,15,5]
while True:
for ix, topic in enumerate (tpc_list):
produce_unbalanced_message(topic_name=topic,
no_of_msgs=int(round((int(nm) * (float(dl[ix])/100.0)))),
max_wait_time=float(mwt)
if __name__ == "__main__":
mf()
main函数调用下面提到的Producer send函数,向主题列表中的每个主题发送消息。
不平衡产生消息函数
def produce_unbalanced_message(topic_name='test-topic',
no_of_msgs=-1,
max_wait_time=2):
kafka_admin_client: KafkaAdminClient = KafkaAdminClient(
bootstrap_servers='10.22.151.16:9100'
)
.
.
# List of all node ids in the cluster
LOG.info("Fetch the existing Kafka node list")
nodeids: List[int] = [node.nodeId for node in kafka_admin_client._client.cluster.brokers()]
for n in nodeids:
print(n)
.
.
.
# sending unbalanced messages to Kafka
producer.send(topic_name,
key=key,
value=message)
.
.
根据要求,消息应该根据broker nos和相应的流量分布列表发送,而不是主题列表。我们从 produce_unbalanced_message 函数中的 nodeids 列表中获取的经纪人编号。
但是,在按照流量分布列表参数针对超过三个的主题计数测试此代码时,我们得到 - 索引超出范围错误。这是因为我们一增加它们的值就在主题列表中,流量列表分布值不匹配,因为它们是根据代理设置的。
任何人都可以建议尝试进行哪些更改,以便根据从 nodeids 列表和相应的流量分配列表中获得的代理编号而不是根据主题列表来发送消息?
the traffic list distribution values are not matching as they are set according to the broker
您已将两个列表关联起来。如果您将有 more/less 个主题而不是“分发列表”,那么您不能使用一个列表的索引来访问另一个列表。
IMO,如果你有一个静态定义的 if 块,它会更具可读性,因为实际上不需要获取主题列表来创建一个不平衡的集群。
如果您想要 100% 的分布,只需使用随机范围
import random
while True:
value = random.random()
topic = None
if 0 <= value < 0.80:
topic = 't1'
elif 0.80 <= value < 0.95:
topic = 't2'
else:
topic = 't3'
print('Produce to topic ' + topic)
如果您确实希望它“不平衡”,您将需要验证主题是否只有一个副本并且也由不同的代理托管
我们有一个要求,我们需要通过将 3 个代理集群的流量分配分配给 Kafka 集群,例如 80% 到代理 1 15% 到代理 2 5% 到代理 3 并发送消息根据经纪人流量分配给经纪人的主题。
为了使用 kafka-python 在 python 编程中实现此逻辑,我们从主函数中调用生成不平衡消息函数。下面提供了实现逻辑的代码示例:-
主要功能
def mf():
.
.
.
# create a topic if the topic doesn't exists. Tps_crtn will create new topic if no existing topics found else, will send messages to the existing topics, as usual.
tpc_list = tps_crtn(base_topic_name=bt, no_of_topics=int(ntp),
topic_partn=int(ptp),
repicas_per_partn=int(rpp))
#traffic distribution list
dl = [80,15,5]
while True:
for ix, topic in enumerate (tpc_list):
produce_unbalanced_message(topic_name=topic,
no_of_msgs=int(round((int(nm) * (float(dl[ix])/100.0)))),
max_wait_time=float(mwt)
if __name__ == "__main__":
mf()
main函数调用下面提到的Producer send函数,向主题列表中的每个主题发送消息。
不平衡产生消息函数
def produce_unbalanced_message(topic_name='test-topic',
no_of_msgs=-1,
max_wait_time=2):
kafka_admin_client: KafkaAdminClient = KafkaAdminClient(
bootstrap_servers='10.22.151.16:9100'
)
.
.
# List of all node ids in the cluster
LOG.info("Fetch the existing Kafka node list")
nodeids: List[int] = [node.nodeId for node in kafka_admin_client._client.cluster.brokers()]
for n in nodeids:
print(n)
.
.
.
# sending unbalanced messages to Kafka
producer.send(topic_name,
key=key,
value=message)
.
.
根据要求,消息应该根据broker nos和相应的流量分布列表发送,而不是主题列表。我们从 produce_unbalanced_message 函数中的 nodeids 列表中获取的经纪人编号。
但是,在按照流量分布列表参数针对超过三个的主题计数测试此代码时,我们得到 - 索引超出范围错误。这是因为我们一增加它们的值就在主题列表中,流量列表分布值不匹配,因为它们是根据代理设置的。
任何人都可以建议尝试进行哪些更改,以便根据从 nodeids 列表和相应的流量分配列表中获得的代理编号而不是根据主题列表来发送消息?
the traffic list distribution values are not matching as they are set according to the broker
您已将两个列表关联起来。如果您将有 more/less 个主题而不是“分发列表”,那么您不能使用一个列表的索引来访问另一个列表。
IMO,如果你有一个静态定义的 if 块,它会更具可读性,因为实际上不需要获取主题列表来创建一个不平衡的集群。
如果您想要 100% 的分布,只需使用随机范围
import random
while True:
value = random.random()
topic = None
if 0 <= value < 0.80:
topic = 't1'
elif 0.80 <= value < 0.95:
topic = 't2'
else:
topic = 't3'
print('Produce to topic ' + topic)
如果您确实希望它“不平衡”,您将需要验证主题是否只有一个副本并且也由不同的代理托管