如何为 confluent-python 客户端覆盖 kafka 中的默认配置值?
How to overwrite default config values in kafka for confluent-python client?
作为初学者,我正在探索 Apache Kafka 和 confluent-kafka-python 客户端。当我尝试从生产者发送简单消息时,消费者能够成功使用消息。以为我会尝试将图像作为有效负载发送。因此,继续使用 1MB(png) 图像,我的制作人无法生成消息。我遇到的错误是
p.produce('mytopic', callback=delivery_report, key='hello', value=str_value)
cimpl.KafkaException: KafkaError{code=MSG_SIZE_TOO_LARGE,val=10,str="Unable to produce message: Broker: Message size too large"}
虽然我做了一些谷歌搜索发现 并且
How can I send large messages with Kafka (over 15MB)?
所以我修改了我的 server.props(经纪人方面),如下所示:
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
max.message.bytes=1048576
message.max.bytes=1048576
replica.fetch.max.bytes=1048576
但我仍然无法解决问题。
producer.py
from confluent_kafka import Producer
import base64
import time
# some_data_source = ['hey', 'hi']
with open("1mb.png", "rb") as imageFile:
str_value = base64.b64encode(imageFile.read())
p = Producer({'bootstrap.servers': 'localhost:9092', 'compression.type': 'snappy'})
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
for _ in range(2):
# Trigger any available delivery report callbacks from previous produce() calls
p.poll(0)
# Asynchronously produce a message, the delivery report callback
# will be triggered from poll() above, or flush() below, when the message has
# been successfully delivered or failed permanently.
p.produce('mytopic', callback=delivery_report, key='hello', value=str_value)
# Wait for any outstanding messages to be delivered and delivery report
# callbacks to be triggered.
p.flush()
consumer.py
from confluent_kafka import Consumer
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'mygroup',
'auto.offset.reset': 'earliest'
})
c.subscribe(['mytopic'])
while True:
msg = c.poll(1.0)
if msg is None:
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
print('Received message: {}'.format(msg.value().decode('utf-8')))
c.close()
我需要添加任何参数还是我在配置中遗漏了什么?任何帮助将不胜感激。
谢谢
看起来您对代理默认设置的更改不大;它仍然在 1MB 左右。
对于您的客户端错误,您需要将 message.max.bytes
添加到 Producer 配置中
如果您需要任何其他客户端属性,例如消费者最大提取字节数,请在此处记录这些属性
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
总的来说,建议是将图像上传到集中式文件存储,然后通过 Kafka 将它们的 URI 位置作为纯字符串发送。这将增加您的代理的吞吐量并减少存储需求,特别是如果您 sending/copying 多个主题的相同图像数据
作为初学者,我正在探索 Apache Kafka 和 confluent-kafka-python 客户端。当我尝试从生产者发送简单消息时,消费者能够成功使用消息。以为我会尝试将图像作为有效负载发送。因此,继续使用 1MB(png) 图像,我的制作人无法生成消息。我遇到的错误是
p.produce('mytopic', callback=delivery_report, key='hello', value=str_value)
cimpl.KafkaException: KafkaError{code=MSG_SIZE_TOO_LARGE,val=10,str="Unable to produce message: Broker: Message size too large"}
虽然我做了一些谷歌搜索发现
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
max.message.bytes=1048576
message.max.bytes=1048576
replica.fetch.max.bytes=1048576
但我仍然无法解决问题。
producer.py
from confluent_kafka import Producer
import base64
import time
# some_data_source = ['hey', 'hi']
with open("1mb.png", "rb") as imageFile:
str_value = base64.b64encode(imageFile.read())
p = Producer({'bootstrap.servers': 'localhost:9092', 'compression.type': 'snappy'})
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
for _ in range(2):
# Trigger any available delivery report callbacks from previous produce() calls
p.poll(0)
# Asynchronously produce a message, the delivery report callback
# will be triggered from poll() above, or flush() below, when the message has
# been successfully delivered or failed permanently.
p.produce('mytopic', callback=delivery_report, key='hello', value=str_value)
# Wait for any outstanding messages to be delivered and delivery report
# callbacks to be triggered.
p.flush()
consumer.py
from confluent_kafka import Consumer
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'mygroup',
'auto.offset.reset': 'earliest'
})
c.subscribe(['mytopic'])
while True:
msg = c.poll(1.0)
if msg is None:
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
print('Received message: {}'.format(msg.value().decode('utf-8')))
c.close()
我需要添加任何参数还是我在配置中遗漏了什么?任何帮助将不胜感激。
谢谢
看起来您对代理默认设置的更改不大;它仍然在 1MB 左右。
对于您的客户端错误,您需要将 message.max.bytes
添加到 Producer 配置中
如果您需要任何其他客户端属性,例如消费者最大提取字节数,请在此处记录这些属性
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
总的来说,建议是将图像上传到集中式文件存储,然后通过 Kafka 将它们的 URI 位置作为纯字符串发送。这将增加您的代理的吞吐量并减少存储需求,特别是如果您 sending/copying 多个主题的相同图像数据