如何自动生成消息的分区键(Kafka + Python)?

How to automatically generate partition keys for messages (Kafka + Python)?

我正在尝试为 Kafka 中的每条消息生成密钥,为此我想创建一个密钥生成器,将主题的前两个字符和推文 ID 连接起来。

这是在 kafka 中发送的消息示例:

{"data":{"created_at":"2022-03-18T09:51:12.000Z","id":"1504757303811231755","text":"@Danielog111 @POTUS @NATO @UNPeacekeeping @UN Yes! Not to minimize Ukraine at all, but to bring attention to a horrific crisis and Tigrayan genocide that targets 7M people, longer time frame, and is largely unacknowledged by western news agencies. And people are being eaten-literally! @maddow @JoyAnnReid help Ethiopians!"},"matching_rules":[{"id":"1502932028618072070","tag":"NATO"},{"id":"1502932021731115013","tag":"Biden"}]}'

这是我修改后的代码以尝试生成分区键(我使用的是 PyKafka):

from dotenv import load_dotenv
import os
import json
import tweepy
from pykafka import KafkaClient


# Getting credentials:

BEARER_TOKEN=os.getenv("BEARER_TOKEN")

# Setting up pykafka:

def get_kafka_client():
    return KafkaClient(hosts='localhost:9092,localhost:9093,localhost:9094')

def send_message(data, name_topic, id):    
    client = get_kafka_client()
    topic = client.topics[name_topic]
    producer = topic.get_sync_producer()
    producer.produce(data, partition_key=f"{name_topic[:2]}{id}")

# Creating a Twitter stream listener:

class Listener(tweepy.StreamingClient):

    def on_data(self, data):
        print(data)
        message = json.loads(data)
        for rule in message['matching_rules']:
            send_message(data, rule['tag'], message['data']['id'].encode())
        return True
    
    def on_error(self, status):
        print(status)

# Start streaming:

Listener(BEARER_TOKEN).filter(tweet_fields=['created_at'])

这是我遇到的错误:

File "/Users/mac/.local/share/virtualenvs/tweepy_step-Ck3DvAWI/lib/python3.9/site-packages/pykafka/producer.py", line 372, in produce
raise TypeError("Producer.produce accepts a bytes object as partition_key, "
TypeError: ("Producer.produce accepts a bytes object as partition_key, but it got '%s'", <class 'str'>)

我也试过不对其进行编码并尝试仅使用数据(以字节为单位)获取 ID,但是 none 这些选项有效。

我发现了错误,我应该编码分区键而不是 json id:

def send_message(data, name_topic, id):    
    client = get_kafka_client()
    topic = client.topics[name_topic]
    producer = topic.get_sync_producer()
    producer.produce(data, partition_key=f"{name_topic[:2]}{id}".encode())

# Creating a Twitter stream listener:

class Listener(tweepy.StreamingClient):

    def on_data(self, data):
        print(data)
        message = json.loads(data)
        for rule in message['matching_rules']:
            send_message(data, rule['tag'], message['data']['id'])
        return True
    
    def on_error(self, status):
        print(status)