Confluent Kafka Python library configure producer for bulk ms​​g

Confluent Kafka Python library configure producer for bulk msg

我需要将 Kafka 生产者设置为一批发送 500 条消息,而不是逐条发送消息,而是批量导入。我检查了 https://github.com/dpkp/kafka-python/issues/479 并尝试了 producer.send_messages(topic, *message) 但它因错误而失败:

>       producer.send_messages('event_connector_mt', *load_entries)
E       AttributeError: 'cimpl.Producer' object has no attribute 'send_messages'

我也试过像 producer.produce(topic, *message) 失败:

       producer.produce('event_connector_mt', *load_entries)
E       TypeError: function takes at most 8 arguments (501 given)

所以我深入挖掘,发现我必须在生产者配置中将类型设置为异步并且 batch.size 比默认值大,但是当我尝试像这样的配置时:

from confluent_kafka import Consumer, Producer       

producer = Producer(**{'bootstrap.servers': KAFKA_BROKERS,
                       'queue.buffering.max.messages': 1000000,
                       'batch.num.messages': 500,
                       'batch.size': 19999,
                       'producer.type': 'async'
                       })

失败:

E       KafkaException: KafkaError{code=_INVALID_ARG,val=-186,str="No such configuration property: "producer.type""}

batch.size同样的错误 你能告诉我在哪里以及如何设置异步和批量大小或任何其他方式将批量消息传递给 Kafka 0.9.3.1

默认情况下,所有生产者都是异步的。底层 librdkafka 库不支持 Producer.type 和 batch.size 配置。

因此,请使用可用配置 batch.num.messages 或 message.max.bytes。

您混淆了名为 "confluent-kafka-python"

的 Confluent python 客户端

http://docs.confluent.io/3.2.1/clients/confluent-kafka-python/

与"kafka-python"客户

https://github.com/dpkp/kafka-python

这是两个具有不同 API 的不同客户端。