Kafka 生产者始终将消息发送到同一分区 (Kafka + Python)

Kafka producer always sends messages to the same partition (Kafka + Python)

我用 docker-compose 设置了一个 3 节点的 Kafka 集群,然后我创建了 5 个主题和 3 个分区,复制因子为 3。我将生产者设置为连接到每个节点的端口。

消息按顺序从一个地方发送到另一个地方(应该如此),但我在使用 UI 检查我的集群后意识到所有主题的所有消息都将发送到同一个分区(分区 # 2).

起初,我认为这可能与没有为消息设置任何分区键有关,所以我修改了我的脚本,为每条消息添加一个分区键(前两个字母的组合主题和推文的 ID 号,这种分区键格式是否有意义?)但问题仍然存在。

这是代码(它从 Twitter API v2 接收推文并与生产者发送消息):

from dotenv import load_dotenv
import os
import json
import tweepy
from pykafka import KafkaClient


# Getting credentials:

BEARER_TOKEN=os.getenv("BEARER_TOKEN")

# Setting up pykafka:

def get_kafka_client():
    return KafkaClient(hosts='localhost:9092,localhost:9093,localhost:9094')

def send_message(data, name_topic, id):    
    client = get_kafka_client()
    topic = client.topics[name_topic]
    producer = topic.get_sync_producer()
    producer.produce(data, partition_key=f"{name_topic[:2].upper()}{id}".encode())

# Creating a Twitter stream listener:

class Listener(tweepy.StreamingClient):

    def on_data(self, data):
        print(data)
        message = json.loads(data)
        for rule in message['matching_rules']:
            send_message(data, rule['tag'], message['data']['id'])
        return True
    
    def on_error(self, status):
        print(status)

# Start streaming:

Listener(BEARER_TOKEN).filter(tweet_fields=['created_at'])

我认为如果没有任何给定的密钥,它会开始随机向三个分区发送消息,但它也没有这样做。 我不知道问题出在哪里。

如果可能相关,所有 5 个主题都是在 docker 中创建的,使用此格式撰写:

docker-compose exec kafka1 kafka-topics --bootstrap-server kafka1:19092 --create --replication-factor 3 --partitions 3 --topic NoFlyZone

如果没有给出密钥,它应该发送到多个分区。如果你给了一个密钥,那么你 运行 计算相同分区哈希的风险,即使你有不同的密钥。

由于 PyKafka 不再维护

,您可能需要使用 kafka-pythonconfluent-kafka-python 等其他库进行测试