confluent-python kafka 生产者发送回调 message.offset() returns 0

confluent-python kafka producer send callback message.offset() returns 0

Producer.send 回调提供了一个消息对象。 message.offset() 通常 returns 0 似乎是错误。

这是使用: confluent-kafka python 库版本 0.11.0 librdkafka:稳定的 0.11.0(瓶装),HEAD。通过 Mac OS Homebrew 安装

下面是简单的测试程序:

import confluent_kafka
import timeit


def delivery_callback(error, message):
    print("delivery_callback. error={}. message={}".format(error, message))
    print("message.topic={}".format(message.topic()))
    print("message.timestamp={}".format(message.timestamp()))
    print("message.key={}".format(message.key()))
    print("message.value={}".format(message.value()))
    print("message.partition={}".format(message.partition()))
    print("message.offset={}".format(message.offset()))


def produce_string_messages(kafka_producer, topic_name, num_messages):
    start_time = timeit.default_timer()

    for i in range(num_messages):
        kafka_producer.produce(topic_name, value="cf-k test. v{}".format(i), on_delivery=delivery_callback)

    elapsed = timeit.default_timer() - start_time
    print("completed producing messages. They are queued for delivery. elapsed={}. elapsed/msg={}".format(elapsed, elapsed / num_messages))


if __name__ == "__main__":
    print("starting")

    conf = {
        'bootstrap.servers': "kafka-broker-1:9092"
    }

    kafka_producer = confluent_kafka.Producer(conf)

    print("opened KafkaProducer")
    produce_string_messages(kafka_producer, "my-string-topic", 3)

    print("flushing...")
    kafka_producer.flush()

    print("exiting")

产生:

starting
opened KafkaProducer
completed producing messages. They are queued for delivery. elapsed=0.000994920730591. elapsed/msg=0.00033164024353
flushing...
delivery_callback. error=None. message=<cimpl.Message object at 0x10f986ec0>
message.topic=my-string-topic
message.timestamp=(1, 1508451238822L)
message.key=None
message.value=cf-k test. v0
message.partition=0
message.offset=0
delivery_callback. error=None. message=<cimpl.Message object at 0x10f986ec0>
message.topic=my-string-topic
message.timestamp=(1, 1508451238822L)
message.key=None
message.value=cf-k test. v1
message.partition=0
message.offset=0
delivery_callback. error=None. message=<cimpl.Message object at 0x10f986ec0>
message.topic=my-string-topic
message.timestamp=(1, 1508451238822L)
message.key=None
message.value=cf-k test. v2
message.partition=0
message.offset=24
exiting

请注意前两条消息的 message.offset() 为零,第三条消息为非零。如果我再次 运行 这个发送三个消息的测试程序,第三个 message.offset 递增 3。这看起来只是一个错误,其中 message.offset() 经常错误地 returns 0.

出于性能[1] 的原因,传递报告只为生成的批次中的最后一条消息提供有效偏移量。这可以通过将 produce.offset.report 主题级配置 属性 设置为 true 来更改,以便为批处理中的所有消息提供适当的偏移量,如下所示:

p = confluent_kafka.Producer({'bootstrap.servers': ..., 
                              'default.topic.config': { 'produce.offset.report': True } })

我们将在 Python 客户端的未来版本中将默认值更改为 True。

[1]:它避免了批处理中消息的线性扫描,但性能影响微乎其微且在 Python 领域无关紧要,因此无需担心。