如何使用confluent-kafka-python消费最近N天的消息?

How to consume messages in last N days using confluent-kafka-python?

这个问题和类似,只是我想知道如何在Confluent的官方Python Kafka客户端中做。

我查看了 Consumer.offsets_for_times 函数,但我对它在 TopicPartition.offset 字段中接受时间戳感到困惑。

offset 如何等同于时间戳?

该方法不接受时间戳;仅要为其查找时间戳的分区。

https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.TopicPartition.TopicPartition

也许你是指 timeout 参数?

我最近为 $work 做了这个。您需要获取 offsets_for_times() 的结果,然后 assign() 将列表提供给您的消费者,然后调用 consume()。重要的是,不要 subscribe()进入主题。 (参见 Eden Hill 对 https://github.com/confluentinc/confluent-kafka-python/issues/373 的评论)。

你是对的,这个函数的文档在定义时间戳和偏移量时有些混乱。

更新以回答后续问题:

的区别在于

topicparts = [TopicPartition(topic_name, i) for i in range(0, 8)]

你会这样做:

whents = datetime.fromisoformat("2022-01-01T12:34:56.000")
whenms = int(whents) * 1000   # to get milliseconds

topicparts = [TopicPartition(topic_name, i, whenms) for i in range(0, 8)]