Python 生产到不同的 Kafka 分区
Python produce to different Kafka partition
我正在尝试通过经典的 Twitter 流示例来学习 Kafka。我正在尝试使用我的制作人将基于 2 个过滤器的推特数据流式传输到同一主题的不同分区。例如,带有 tracks='Google' 的 twitter 数据到一个分区,track='Apple' 到另一个分区。
class Producer(StreamListener):
def __init__(self, producer):
self.producer = producer
def on_data(self, data):
self.producer.send(topic_name, value=data)
return True
def on_error(self, error):
print(error)
twitter_stream = Stream(auth, Producer(producer))
twitter_stream.filter(track=["Google"])
如何添加另一个轨道并将该数据流式传输到另一个分区。
同样,我如何让我的消费者从特定分区消费。
consumer = KafkaConsumer(
topic_name,
bootstrap_servers=['localhost:9092'],
auto_offset_reset='latest',
enable_auto_commit=True,
auto_commit_interval_ms = 5000,
max_poll_records = 100,
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
Kafka 根据消息的键对数据进行分区。在您给定的代码中,您只将 value
传递给 Producer 消息,因此密钥将为空,因此所有分区之间将为 round-robin。
请参阅 Kafka 库的文档,了解如何为每条消息提供密钥
经过一些研究,我能够解决这个问题:
在生产者端,指定分区:
self.producer.send(topic_name, value=data,partition=0)
在消费端,
consumer = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
auto_offset_reset='latest',
enable_auto_commit=True,
auto_commit_interval_ms = 5000,
max_poll_records = 100,
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
consumer.assign([TopicPartition('trial', 0)])
我正在尝试通过经典的 Twitter 流示例来学习 Kafka。我正在尝试使用我的制作人将基于 2 个过滤器的推特数据流式传输到同一主题的不同分区。例如,带有 tracks='Google' 的 twitter 数据到一个分区,track='Apple' 到另一个分区。
class Producer(StreamListener):
def __init__(self, producer):
self.producer = producer
def on_data(self, data):
self.producer.send(topic_name, value=data)
return True
def on_error(self, error):
print(error)
twitter_stream = Stream(auth, Producer(producer))
twitter_stream.filter(track=["Google"])
如何添加另一个轨道并将该数据流式传输到另一个分区。
同样,我如何让我的消费者从特定分区消费。
consumer = KafkaConsumer(
topic_name,
bootstrap_servers=['localhost:9092'],
auto_offset_reset='latest',
enable_auto_commit=True,
auto_commit_interval_ms = 5000,
max_poll_records = 100,
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
Kafka 根据消息的键对数据进行分区。在您给定的代码中,您只将 value
传递给 Producer 消息,因此密钥将为空,因此所有分区之间将为 round-robin。
请参阅 Kafka 库的文档,了解如何为每条消息提供密钥
经过一些研究,我能够解决这个问题:
在生产者端,指定分区:
self.producer.send(topic_name, value=data,partition=0)
在消费端,
consumer = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
auto_offset_reset='latest',
enable_auto_commit=True,
auto_commit_interval_ms = 5000,
max_poll_records = 100,
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
consumer.assign([TopicPartition('trial', 0)])