如何将 Faust 中的消费者设置为特定的偏移量
how to set the consumer in Faust to a specific offset
从 Faust 文档中我无法找到如何将消费者设置为特定的偏移量。
使用 confluent-kafka,我使用 consumer.offsets_for_times 找到一个 start_offset,然后将 TopicPartition 分配给该特定偏移量,例如:
start_offset = consumer.offsets_for_times([
TopicPartition("prediction.OfferPredictionCheckpoint", 0, int(start_date)),
TopicPartition("prediction.OfferPredictionCheckpoint", 1, int(start_date)),
])
consumer.assign([
TopicPartition("prediction.OfferPredictionCheckpoint", partition_number, pos)
])
对于浮士德我找不到更多的东西:
consumer_auto_offset_reset
只允许你设置最早或最晚。我如何从特定的时间或一天的开始开始阅读?
我认为这可能是您要查找的内容:https://faust.readthedocs.io/en/latest/reference/faust.transport.consumer.html#faust.transport.consumer.Consumer.seek
它可以转到特定的偏移量,但是我不认为没有一些额外的逻辑就可以告诉它转到特定的时间或日期(也许使用偏移量进行二进制搜索?) .
要将偏移量设置为特定值,您可以使用这些示例。在这里,我将偏移量设置为 50000。每次我启动我的应用程序时,代理都会从偏移量 50000 处开始读取。为此,我使用 app.consumer.seek
这里 tp 接受两个参数,主题 - 在这种情况下为测试,0 是分区号。
欲了解更多信息 faust.types
from faust.types import TP, Message
tp = TP("test", 0)
topic = app.topic(tp.topic)
@app.task()
async def on_start():
await app.consumer.seek(tp, 50000)
print("App startet")
@app.agent(topic)
async def receive(stream):
async for event in stream.events():
print((event.message.offset, event.value))
从 Faust 文档中我无法找到如何将消费者设置为特定的偏移量。
使用 confluent-kafka,我使用 consumer.offsets_for_times 找到一个 start_offset,然后将 TopicPartition 分配给该特定偏移量,例如:
start_offset = consumer.offsets_for_times([
TopicPartition("prediction.OfferPredictionCheckpoint", 0, int(start_date)),
TopicPartition("prediction.OfferPredictionCheckpoint", 1, int(start_date)),
])
consumer.assign([
TopicPartition("prediction.OfferPredictionCheckpoint", partition_number, pos)
])
对于浮士德我找不到更多的东西:
consumer_auto_offset_reset
只允许你设置最早或最晚。我如何从特定的时间或一天的开始开始阅读?
我认为这可能是您要查找的内容:https://faust.readthedocs.io/en/latest/reference/faust.transport.consumer.html#faust.transport.consumer.Consumer.seek
它可以转到特定的偏移量,但是我不认为没有一些额外的逻辑就可以告诉它转到特定的时间或日期(也许使用偏移量进行二进制搜索?) .
要将偏移量设置为特定值,您可以使用这些示例。在这里,我将偏移量设置为 50000。每次我启动我的应用程序时,代理都会从偏移量 50000 处开始读取。为此,我使用 app.consumer.seek
这里 tp 接受两个参数,主题 - 在这种情况下为测试,0 是分区号。 欲了解更多信息 faust.types
from faust.types import TP, Message
tp = TP("test", 0)
topic = app.topic(tp.topic)
@app.task()
async def on_start():
await app.consumer.seek(tp, 50000)
print("App startet")
@app.agent(topic)
async def receive(stream):
async for event in stream.events():
print((event.message.offset, event.value))