如何使用confluent-kafka-python消费最近N天的消息?
How to consume messages in last N days using confluent-kafka-python?
这个问题和类似,只是我想知道如何在Confluent的官方Python Kafka客户端中做。
我查看了 Consumer.offsets_for_times 函数,但我对它在 TopicPartition.offset
字段中接受时间戳感到困惑。
offset
如何等同于时间戳?
该方法不接受时间戳;仅要为其查找时间戳的分区。
也许你是指 timeout 参数?
我最近为 $work 做了这个。您需要获取 offsets_for_times()
的结果,然后 assign()
将列表提供给您的消费者,然后调用 consume()
。重要的是,不要 subscribe()
进入主题。 (参见 Eden Hill 对 https://github.com/confluentinc/confluent-kafka-python/issues/373 的评论)。
你是对的,这个函数的文档在定义时间戳和偏移量时有些混乱。
更新以回答后续问题:
与的区别在于
topicparts = [TopicPartition(topic_name, i) for i in range(0, 8)]
你会这样做:
whents = datetime.fromisoformat("2022-01-01T12:34:56.000")
whenms = int(whents) * 1000 # to get milliseconds
topicparts = [TopicPartition(topic_name, i, whenms) for i in range(0, 8)]
这个问题和
我查看了 Consumer.offsets_for_times 函数,但我对它在 TopicPartition.offset
字段中接受时间戳感到困惑。
offset
如何等同于时间戳?
该方法不接受时间戳;仅要为其查找时间戳的分区。
也许你是指 timeout 参数?
我最近为 $work 做了这个。您需要获取 offsets_for_times()
的结果,然后 assign()
将列表提供给您的消费者,然后调用 consume()
。重要的是,不要 subscribe()
进入主题。 (参见 Eden Hill 对 https://github.com/confluentinc/confluent-kafka-python/issues/373 的评论)。
你是对的,这个函数的文档在定义时间戳和偏移量时有些混乱。
更新以回答后续问题:
与
topicparts = [TopicPartition(topic_name, i) for i in range(0, 8)]
你会这样做:
whents = datetime.fromisoformat("2022-01-01T12:34:56.000")
whenms = int(whents) * 1000 # to get milliseconds
topicparts = [TopicPartition(topic_name, i, whenms) for i in range(0, 8)]