Kafka 生产者始终将消息发送到同一分区 (Kafka + Python)
Kafka producer always sends messages to the same partition (Kafka + Python)
我用 docker-compose 设置了一个 3 节点的 Kafka 集群,然后我创建了 5 个主题和 3 个分区,复制因子为 3。我将生产者设置为连接到每个节点的端口。
消息按顺序从一个地方发送到另一个地方(应该如此),但我在使用 UI 检查我的集群后意识到所有主题的所有消息都将发送到同一个分区(分区 # 2).
起初,我认为这可能与没有为消息设置任何分区键有关,所以我修改了我的脚本,为每条消息添加一个分区键(前两个字母的组合主题和推文的 ID 号,这种分区键格式是否有意义?)但问题仍然存在。
这是代码(它从 Twitter API v2 接收推文并与生产者发送消息):
from dotenv import load_dotenv
import os
import json
import tweepy
from pykafka import KafkaClient
# Getting credentials:
BEARER_TOKEN=os.getenv("BEARER_TOKEN")
# Setting up pykafka:
def get_kafka_client():
return KafkaClient(hosts='localhost:9092,localhost:9093,localhost:9094')
def send_message(data, name_topic, id):
client = get_kafka_client()
topic = client.topics[name_topic]
producer = topic.get_sync_producer()
producer.produce(data, partition_key=f"{name_topic[:2].upper()}{id}".encode())
# Creating a Twitter stream listener:
class Listener(tweepy.StreamingClient):
def on_data(self, data):
print(data)
message = json.loads(data)
for rule in message['matching_rules']:
send_message(data, rule['tag'], message['data']['id'])
return True
def on_error(self, status):
print(status)
# Start streaming:
Listener(BEARER_TOKEN).filter(tweet_fields=['created_at'])
我认为如果没有任何给定的密钥,它会开始随机向三个分区发送消息,但它也没有这样做。
我不知道问题出在哪里。
如果可能相关,所有 5 个主题都是在 docker 中创建的,使用此格式撰写:
docker-compose exec kafka1 kafka-topics --bootstrap-server kafka1:19092 --create --replication-factor 3 --partitions 3 --topic NoFlyZone
如果没有给出密钥,它应该发送到多个分区。如果你给了一个密钥,那么你 运行 计算相同分区哈希的风险,即使你有不同的密钥。
由于 PyKafka
不再维护
,您可能需要使用 kafka-python
或 confluent-kafka-python
等其他库进行测试
我用 docker-compose 设置了一个 3 节点的 Kafka 集群,然后我创建了 5 个主题和 3 个分区,复制因子为 3。我将生产者设置为连接到每个节点的端口。
消息按顺序从一个地方发送到另一个地方(应该如此),但我在使用 UI 检查我的集群后意识到所有主题的所有消息都将发送到同一个分区(分区 # 2).
起初,我认为这可能与没有为消息设置任何分区键有关,所以我修改了我的脚本,为每条消息添加一个分区键(前两个字母的组合主题和推文的 ID 号,这种分区键格式是否有意义?)但问题仍然存在。
这是代码(它从 Twitter API v2 接收推文并与生产者发送消息):
from dotenv import load_dotenv
import os
import json
import tweepy
from pykafka import KafkaClient
# Getting credentials:
BEARER_TOKEN=os.getenv("BEARER_TOKEN")
# Setting up pykafka:
def get_kafka_client():
return KafkaClient(hosts='localhost:9092,localhost:9093,localhost:9094')
def send_message(data, name_topic, id):
client = get_kafka_client()
topic = client.topics[name_topic]
producer = topic.get_sync_producer()
producer.produce(data, partition_key=f"{name_topic[:2].upper()}{id}".encode())
# Creating a Twitter stream listener:
class Listener(tweepy.StreamingClient):
def on_data(self, data):
print(data)
message = json.loads(data)
for rule in message['matching_rules']:
send_message(data, rule['tag'], message['data']['id'])
return True
def on_error(self, status):
print(status)
# Start streaming:
Listener(BEARER_TOKEN).filter(tweet_fields=['created_at'])
我认为如果没有任何给定的密钥,它会开始随机向三个分区发送消息,但它也没有这样做。 我不知道问题出在哪里。
如果可能相关,所有 5 个主题都是在 docker 中创建的,使用此格式撰写:
docker-compose exec kafka1 kafka-topics --bootstrap-server kafka1:19092 --create --replication-factor 3 --partitions 3 --topic NoFlyZone
如果没有给出密钥,它应该发送到多个分区。如果你给了一个密钥,那么你 运行 计算相同分区哈希的风险,即使你有不同的密钥。
由于 PyKafka
不再维护
kafka-python
或 confluent-kafka-python
等其他库进行测试