Confluent Kafka Python library configure producer for bulk msg
Confluent Kafka Python library configure producer for bulk msg
我需要将 Kafka 生产者设置为一批发送 500 条消息,而不是逐条发送消息,而是批量导入。我检查了 https://github.com/dpkp/kafka-python/issues/479 并尝试了 producer.send_messages(topic, *message)
但它因错误而失败:
> producer.send_messages('event_connector_mt', *load_entries)
E AttributeError: 'cimpl.Producer' object has no attribute 'send_messages'
我也试过像
producer.produce(topic, *message)
失败:
producer.produce('event_connector_mt', *load_entries)
E TypeError: function takes at most 8 arguments (501 given)
所以我深入挖掘,发现我必须在生产者配置中将类型设置为异步并且 batch.size 比默认值大,但是当我尝试像这样的配置时:
from confluent_kafka import Consumer, Producer
producer = Producer(**{'bootstrap.servers': KAFKA_BROKERS,
'queue.buffering.max.messages': 1000000,
'batch.num.messages': 500,
'batch.size': 19999,
'producer.type': 'async'
})
失败:
E KafkaException: KafkaError{code=_INVALID_ARG,val=-186,str="No such configuration property: "producer.type""}
batch.size同样的错误
你能告诉我在哪里以及如何设置异步和批量大小或任何其他方式将批量消息传递给 Kafka 0.9.3.1
默认情况下,所有生产者都是异步的。底层 librdkafka 库不支持 Producer.type 和 batch.size 配置。
因此,请使用可用配置 batch.num.messages 或 message.max.bytes。
您混淆了名为 "confluent-kafka-python"
的 Confluent python 客户端
http://docs.confluent.io/3.2.1/clients/confluent-kafka-python/
与"kafka-python"客户
https://github.com/dpkp/kafka-python
这是两个具有不同 API 的不同客户端。
我需要将 Kafka 生产者设置为一批发送 500 条消息,而不是逐条发送消息,而是批量导入。我检查了 https://github.com/dpkp/kafka-python/issues/479 并尝试了 producer.send_messages(topic, *message)
但它因错误而失败:
> producer.send_messages('event_connector_mt', *load_entries)
E AttributeError: 'cimpl.Producer' object has no attribute 'send_messages'
我也试过像
producer.produce(topic, *message)
失败:
producer.produce('event_connector_mt', *load_entries)
E TypeError: function takes at most 8 arguments (501 given)
所以我深入挖掘,发现我必须在生产者配置中将类型设置为异步并且 batch.size 比默认值大,但是当我尝试像这样的配置时:
from confluent_kafka import Consumer, Producer
producer = Producer(**{'bootstrap.servers': KAFKA_BROKERS,
'queue.buffering.max.messages': 1000000,
'batch.num.messages': 500,
'batch.size': 19999,
'producer.type': 'async'
})
失败:
E KafkaException: KafkaError{code=_INVALID_ARG,val=-186,str="No such configuration property: "producer.type""}
batch.size同样的错误 你能告诉我在哪里以及如何设置异步和批量大小或任何其他方式将批量消息传递给 Kafka 0.9.3.1
默认情况下,所有生产者都是异步的。底层 librdkafka 库不支持 Producer.type 和 batch.size 配置。
因此,请使用可用配置 batch.num.messages 或 message.max.bytes。
您混淆了名为 "confluent-kafka-python"
的 Confluent python 客户端http://docs.confluent.io/3.2.1/clients/confluent-kafka-python/
与"kafka-python"客户
https://github.com/dpkp/kafka-python
这是两个具有不同 API 的不同客户端。