Kafka 的 Confluent Python API

Confluent Python API for Kafka

官方 Confluent Kafka 的基本用法出现错误 Python API:

我订阅了:

kafka_consumer.subscribe(topics=["my-avro-topic"], on_assign=on_assign_callback, on_revoke=on_revoke_callback)

使用回调:

def on_assign_callback(consumer, topic_partitions):
for topic_partition in topic_partitions:
    print("without position. topic={}. partition={}. offset={}. error={}".format(topic_partition.topic, topic_partition.partition,
                                   topic_partition.offset, topic_partition.error))

topic_partitions_with_offsets = consumer.position(topic_partitions)
print("assigned to {}->{} partitions".format(len(topic_partitions), len(topic_partitions_with_offsets)))

for topic_partition in topic_partitions_with_offsets:
    print("with position. topic={}. partition={}. offset={}. error={}".format(topic_partition.topic, topic_partition.partition,
                                   topic_partition.offset, topic_partition.error))

产生控制台输出:

without position. topic=my-avro-topic. partition=0. offset=-1001. error=None
assigned to 1->1 partitions
with position. topic=my-avro-topic. partition=0. offset=-1001. error=KafkaError{code=_UNKNOWN_PARTITION,val=-190,str="(null)"}

有人可以解释一下吗?为什么我会收到未知分区的回调通知?类似的代码可以完美地使用 Java API.

这是底层 C 库 librdkafka 中的错误。 见 upstream issue.

如果您想从存储的偏移量开始消费,您实际上不需要调用 position() 来检索它们,如果您不更改默认偏移量 -1001,客户端将自动执行此操作。