如何自动生成消息的分区键(Kafka + Python)?
How to automatically generate partition keys for messages (Kafka + Python)?
我正在尝试为 Kafka 中的每条消息生成密钥,为此我想创建一个密钥生成器,将主题的前两个字符和推文 ID 连接起来。
这是在 kafka 中发送的消息示例:
{"data":{"created_at":"2022-03-18T09:51:12.000Z","id":"1504757303811231755","text":"@Danielog111 @POTUS @NATO @UNPeacekeeping @UN Yes! Not to minimize Ukraine at all, but to bring attention to a horrific crisis and Tigrayan genocide that targets 7M people, longer time frame, and is largely unacknowledged by western news agencies. And people are being eaten-literally! @maddow @JoyAnnReid help Ethiopians!"},"matching_rules":[{"id":"1502932028618072070","tag":"NATO"},{"id":"1502932021731115013","tag":"Biden"}]}'
这是我修改后的代码以尝试生成分区键(我使用的是 PyKafka):
from dotenv import load_dotenv
import os
import json
import tweepy
from pykafka import KafkaClient
# Getting credentials:
BEARER_TOKEN=os.getenv("BEARER_TOKEN")
# Setting up pykafka:
def get_kafka_client():
return KafkaClient(hosts='localhost:9092,localhost:9093,localhost:9094')
def send_message(data, name_topic, id):
client = get_kafka_client()
topic = client.topics[name_topic]
producer = topic.get_sync_producer()
producer.produce(data, partition_key=f"{name_topic[:2]}{id}")
# Creating a Twitter stream listener:
class Listener(tweepy.StreamingClient):
def on_data(self, data):
print(data)
message = json.loads(data)
for rule in message['matching_rules']:
send_message(data, rule['tag'], message['data']['id'].encode())
return True
def on_error(self, status):
print(status)
# Start streaming:
Listener(BEARER_TOKEN).filter(tweet_fields=['created_at'])
这是我遇到的错误:
File "/Users/mac/.local/share/virtualenvs/tweepy_step-Ck3DvAWI/lib/python3.9/site-packages/pykafka/producer.py", line 372, in produce
raise TypeError("Producer.produce accepts a bytes object as partition_key, "
TypeError: ("Producer.produce accepts a bytes object as partition_key, but it got '%s'", <class 'str'>)
我也试过不对其进行编码并尝试仅使用数据(以字节为单位)获取 ID,但是 none 这些选项有效。
我发现了错误,我应该编码分区键而不是 json id:
def send_message(data, name_topic, id):
client = get_kafka_client()
topic = client.topics[name_topic]
producer = topic.get_sync_producer()
producer.produce(data, partition_key=f"{name_topic[:2]}{id}".encode())
# Creating a Twitter stream listener:
class Listener(tweepy.StreamingClient):
def on_data(self, data):
print(data)
message = json.loads(data)
for rule in message['matching_rules']:
send_message(data, rule['tag'], message['data']['id'])
return True
def on_error(self, status):
print(status)
我正在尝试为 Kafka 中的每条消息生成密钥,为此我想创建一个密钥生成器,将主题的前两个字符和推文 ID 连接起来。
这是在 kafka 中发送的消息示例:
{"data":{"created_at":"2022-03-18T09:51:12.000Z","id":"1504757303811231755","text":"@Danielog111 @POTUS @NATO @UNPeacekeeping @UN Yes! Not to minimize Ukraine at all, but to bring attention to a horrific crisis and Tigrayan genocide that targets 7M people, longer time frame, and is largely unacknowledged by western news agencies. And people are being eaten-literally! @maddow @JoyAnnReid help Ethiopians!"},"matching_rules":[{"id":"1502932028618072070","tag":"NATO"},{"id":"1502932021731115013","tag":"Biden"}]}'
这是我修改后的代码以尝试生成分区键(我使用的是 PyKafka):
from dotenv import load_dotenv
import os
import json
import tweepy
from pykafka import KafkaClient
# Getting credentials:
BEARER_TOKEN=os.getenv("BEARER_TOKEN")
# Setting up pykafka:
def get_kafka_client():
return KafkaClient(hosts='localhost:9092,localhost:9093,localhost:9094')
def send_message(data, name_topic, id):
client = get_kafka_client()
topic = client.topics[name_topic]
producer = topic.get_sync_producer()
producer.produce(data, partition_key=f"{name_topic[:2]}{id}")
# Creating a Twitter stream listener:
class Listener(tweepy.StreamingClient):
def on_data(self, data):
print(data)
message = json.loads(data)
for rule in message['matching_rules']:
send_message(data, rule['tag'], message['data']['id'].encode())
return True
def on_error(self, status):
print(status)
# Start streaming:
Listener(BEARER_TOKEN).filter(tweet_fields=['created_at'])
这是我遇到的错误:
File "/Users/mac/.local/share/virtualenvs/tweepy_step-Ck3DvAWI/lib/python3.9/site-packages/pykafka/producer.py", line 372, in produce
raise TypeError("Producer.produce accepts a bytes object as partition_key, "
TypeError: ("Producer.produce accepts a bytes object as partition_key, but it got '%s'", <class 'str'>)
我也试过不对其进行编码并尝试仅使用数据(以字节为单位)获取 ID,但是 none 这些选项有效。
我发现了错误,我应该编码分区键而不是 json id:
def send_message(data, name_topic, id):
client = get_kafka_client()
topic = client.topics[name_topic]
producer = topic.get_sync_producer()
producer.produce(data, partition_key=f"{name_topic[:2]}{id}".encode())
# Creating a Twitter stream listener:
class Listener(tweepy.StreamingClient):
def on_data(self, data):
print(data)
message = json.loads(data)
for rule in message['matching_rules']:
send_message(data, rule['tag'], message['data']['id'])
return True
def on_error(self, status):
print(status)