Apache Pulsar - Consumer.seek() 方法按时间戳的行为是什么?
Apache Pulsar - What is the behaviour of the Consumer.seek() method by timestamp?
https://pulsar.apache.org/api/client/2.4.0/org/apache/pulsar/client/api/Consumer.html#seek-long-
在消费者上调用seek(long timestamp)方法时,timestamp是否必须等于消息发布的确切时间?
例如,如果我在 t=1、5、7 发送了三条消息,并且如果我调用 consumer.seek(3),我会收到错误消息吗?或者我的消费者会重置为 t=3,这样如果我调用 consumer.next(),我会收到第二条消息吗?
提前致谢,
Consumer#seek(long timestamp)
允许您将订阅重置为给定的时间戳。寻找之后,消费者将开始接收发布时间等于或大于传递给 seek
方法的时间戳的消息。
以下示例显示如何将消费者重置为前一小时:
try (
// Create PulsarClient
PulsarClient client = PulsarClient
.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
// Create Consumer subscription
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionMode(SubscriptionMode.Durable)
.subscriptionType(SubscriptionType.Key_Shared)
.subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
.subscribe()
) {
// Seek consumer to previous hour
consumer.seek(Instant.now().minus( Duration.ofHours(1)).toEpochMilli());
while (true) {
final Message<String> msg = consumer.receive();
System.out.printf(
"Message received: key=%s, value=%s, topic=%s, id=%s%n",
msg.getKey(),
msg.getValue(),
msg.getTopicName(),
msg.getMessageId().toString());
consumer.acknowledge(msg);
}
}
请注意,如果您有多个属于同一订阅的消费者(例如,Key_Shared),那么所有消费者都将被重置。
https://pulsar.apache.org/api/client/2.4.0/org/apache/pulsar/client/api/Consumer.html#seek-long-
在消费者上调用seek(long timestamp)方法时,timestamp是否必须等于消息发布的确切时间? 例如,如果我在 t=1、5、7 发送了三条消息,并且如果我调用 consumer.seek(3),我会收到错误消息吗?或者我的消费者会重置为 t=3,这样如果我调用 consumer.next(),我会收到第二条消息吗?
提前致谢,
Consumer#seek(long timestamp)
允许您将订阅重置为给定的时间戳。寻找之后,消费者将开始接收发布时间等于或大于传递给 seek
方法的时间戳的消息。
以下示例显示如何将消费者重置为前一小时:
try (
// Create PulsarClient
PulsarClient client = PulsarClient
.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
// Create Consumer subscription
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionMode(SubscriptionMode.Durable)
.subscriptionType(SubscriptionType.Key_Shared)
.subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
.subscribe()
) {
// Seek consumer to previous hour
consumer.seek(Instant.now().minus( Duration.ofHours(1)).toEpochMilli());
while (true) {
final Message<String> msg = consumer.receive();
System.out.printf(
"Message received: key=%s, value=%s, topic=%s, id=%s%n",
msg.getKey(),
msg.getValue(),
msg.getTopicName(),
msg.getMessageId().toString());
consumer.acknowledge(msg);
}
}
请注意,如果您有多个属于同一订阅的消费者(例如,Key_Shared),那么所有消费者都将被重置。