Google 未处理 PubSub 重发消息
Google PubSub resent messages aren't being processed
我使用了 google 文档中的订户示例 Google PubSub
我所做的唯一修改是注释掉消息的确认。
订阅者不再向队列中添加消息,而应根据google云控制台中设置的间隔重新发送消息。
为什么会发生这种情况,还是我遗漏了什么?
public class SubscriberExample {
use the default project id
private static final String PROJECT_ID = ServiceOptions.getDefaultProjectId();
private static final BlockingQueue<PubsubMessage> messages = new LinkedBlockingDeque<>();
static class MessageReceiverExample implements MessageReceiver {
@Override
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
messages.offer(message);
//consumer.ack();
}
}
/** Receive messages over a subscription. */
public static void main(String[] args) throws Exception {
// set subscriber id, eg. my-sub
String subscriptionId = args[0];
ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(
PROJECT_ID, subscriptionId);
Subscriber subscriber = null;
try {
// create a subscriber bound to the asynchronous message receiver
subscriber = Subscriber.newBuilder(subscriptionName, new MessageReceiverExample()).build();
subscriber.startAsync().awaitRunning();
// Continue to listen to messages
while (true) {
PubsubMessage message = messages.take();
System.out.println("Message Id: " + message.getMessageId());
System.out.println("Data: " + message.getData().toStringUtf8());
}
} finally {
if (subscriber != null) {
subscriber.stopAsync();
}
}
}
}
当您不确认消息时,Java 客户端库调用 modifyAckDeadline on the message until maxAckExtensionPeriod 通过。默认情况下,此值为一小时。因此,如果您不 ack/nack 消息或更改此值,则消息很可能在一小时内不会重新传送。如果要更改最大确认延长期,请在构建器上设置:
subscriber = Subscriber.newBuilder(subscriptionName, new MessageReceiverExample())
.setMaxAckExtensionPeriod(Duration.ofSeconds(60))
.build();
还值得注意的是,当您不确认或拒绝消息时,flow control 可能会阻止更多消息的传递。默认情况下,Java 客户端库允许最多 1000 条消息未完成,即等待 ack 或 nack 或最大 ack 扩展期过去。
我使用了 google 文档中的订户示例 Google PubSub 我所做的唯一修改是注释掉消息的确认。
订阅者不再向队列中添加消息,而应根据google云控制台中设置的间隔重新发送消息。
为什么会发生这种情况,还是我遗漏了什么?
public class SubscriberExample {
use the default project id
private static final String PROJECT_ID = ServiceOptions.getDefaultProjectId();
private static final BlockingQueue<PubsubMessage> messages = new LinkedBlockingDeque<>();
static class MessageReceiverExample implements MessageReceiver {
@Override
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
messages.offer(message);
//consumer.ack();
}
}
/** Receive messages over a subscription. */
public static void main(String[] args) throws Exception {
// set subscriber id, eg. my-sub
String subscriptionId = args[0];
ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(
PROJECT_ID, subscriptionId);
Subscriber subscriber = null;
try {
// create a subscriber bound to the asynchronous message receiver
subscriber = Subscriber.newBuilder(subscriptionName, new MessageReceiverExample()).build();
subscriber.startAsync().awaitRunning();
// Continue to listen to messages
while (true) {
PubsubMessage message = messages.take();
System.out.println("Message Id: " + message.getMessageId());
System.out.println("Data: " + message.getData().toStringUtf8());
}
} finally {
if (subscriber != null) {
subscriber.stopAsync();
}
}
}
}
当您不确认消息时,Java 客户端库调用 modifyAckDeadline on the message until maxAckExtensionPeriod 通过。默认情况下,此值为一小时。因此,如果您不 ack/nack 消息或更改此值,则消息很可能在一小时内不会重新传送。如果要更改最大确认延长期,请在构建器上设置:
subscriber = Subscriber.newBuilder(subscriptionName, new MessageReceiverExample())
.setMaxAckExtensionPeriod(Duration.ofSeconds(60))
.build();
还值得注意的是,当您不确认或拒绝消息时,flow control 可能会阻止更多消息的传递。默认情况下,Java 客户端库允许最多 1000 条消息未完成,即等待 ack 或 nack 或最大 ack 扩展期过去。