在特定时间段内从kafka获取结果
get results from kafka for a specific period of time
这是我的代码,它使用 kafka-python
。
now = datetime.now()
month_ago = now - relativedelta(month=1)
topic = 'some_topic_name'
consumer = KafkaConsumer(topic, bootstrap_servers=PROD_KAFKA_SERVER,
security_protocol=PROTOCOL,
group_id=GROUP_ID,
enable_auto_commit=False,
sasl_mechanism=SASL_MECHANISM, sasl_plain_username=SASL_USERNAME,
sasl_plain_password=SASL_PASSWORD)
for msg in consumer:
print(msg)
我想在循环中从 now
和 month_ago
之间的主题获取结果。我该怎么做?
感谢您的帮助!
获取分配给您的消费者的主题分区:
partitions = consumer.assignment()
按日期时间获取分区的偏移量:
month_ago_timestamp = int(month_ago.timestamp() * 1000)
partition_to_timestamp = {part: month_ago_timestamp for part in partitions}
mapping = consumer.offsets_for_times(partition_to_timestamp)
寻找偏移量的分区:
for partition, offset_and_timestamp in partition_to_offset_and_timestamp.items():
consumer.seek(partition, offset_and_timestamp[0])
警告!消费者可以 return None,设置为 int 零或在缺少主题、缺少分区或没有时间戳的消息等情况下无限期阻塞
最后,我这样做了 :) 我的代码如下所示:
topic = 'some_topic_name'
consumer = KafkaConsumer(bootstrap_servers=PROD_KAFKA_SERVER,
security_protocol=PROTOCOL,
group_id=GROUP_ID,
sasl_mechanism=SASL_MECHANISM, sasl_plain_username=SASL_USERNAME,
sasl_plain_password=SASL_PASSWORD)
month_ago = (datetime.now() - relativedelta(months=1)).timestamp()
topic_partition = TopicPartition(topic, 0)
assigned_topic = [topic_partition]
consumer.assign(assigned_topic)
partitions = consumer.assignment()
partition_to_timestamp = {part: int(month_ago * 1000) for part in partitions}
end_offsets = consumer.end_offsets(list(partition_to_timestamp.keys()))
mapping = consumer.offsets_for_times(partition_to_timestamp)
for partition, ts in mapping.items():
end_offset = end_offsets.get(partition)
consumer.seek(partition, ts[0])
for msg in consumer:
value = json.loads(msg.value.decode('utf-8'))
# do something
if msg.offset == end_offset - 1:
consumer.close()
break
这是我的代码,它使用 kafka-python
。
now = datetime.now()
month_ago = now - relativedelta(month=1)
topic = 'some_topic_name'
consumer = KafkaConsumer(topic, bootstrap_servers=PROD_KAFKA_SERVER,
security_protocol=PROTOCOL,
group_id=GROUP_ID,
enable_auto_commit=False,
sasl_mechanism=SASL_MECHANISM, sasl_plain_username=SASL_USERNAME,
sasl_plain_password=SASL_PASSWORD)
for msg in consumer:
print(msg)
我想在循环中从 now
和 month_ago
之间的主题获取结果。我该怎么做?
感谢您的帮助!
获取分配给您的消费者的主题分区:
partitions = consumer.assignment()
按日期时间获取分区的偏移量:
month_ago_timestamp = int(month_ago.timestamp() * 1000)
partition_to_timestamp = {part: month_ago_timestamp for part in partitions}
mapping = consumer.offsets_for_times(partition_to_timestamp)
寻找偏移量的分区:
for partition, offset_and_timestamp in partition_to_offset_and_timestamp.items():
consumer.seek(partition, offset_and_timestamp[0])
警告!消费者可以 return None,设置为 int 零或在缺少主题、缺少分区或没有时间戳的消息等情况下无限期阻塞
最后,我这样做了 :) 我的代码如下所示:
topic = 'some_topic_name'
consumer = KafkaConsumer(bootstrap_servers=PROD_KAFKA_SERVER,
security_protocol=PROTOCOL,
group_id=GROUP_ID,
sasl_mechanism=SASL_MECHANISM, sasl_plain_username=SASL_USERNAME,
sasl_plain_password=SASL_PASSWORD)
month_ago = (datetime.now() - relativedelta(months=1)).timestamp()
topic_partition = TopicPartition(topic, 0)
assigned_topic = [topic_partition]
consumer.assign(assigned_topic)
partitions = consumer.assignment()
partition_to_timestamp = {part: int(month_ago * 1000) for part in partitions}
end_offsets = consumer.end_offsets(list(partition_to_timestamp.keys()))
mapping = consumer.offsets_for_times(partition_to_timestamp)
for partition, ts in mapping.items():
end_offset = end_offsets.get(partition)
consumer.seek(partition, ts[0])
for msg in consumer:
value = json.loads(msg.value.decode('utf-8'))
# do something
if msg.offset == end_offset - 1:
consumer.close()
break