订阅 Google Pub/Sub 主题 X 秒,如果没有收到消息则停止
Subscribe to Google Pub/Sub Topic for X seconds and stop if no messages are received
我正在使用 Google Pub/Sub Java SDK 订阅主题。我想要做的是:
- 开始听一个主题 X 秒(假设 25 秒)
- 如果收到消息,则停止收听并处理消息(这可能需要几分钟)
- 处理完消息后,继续监听主题 25 秒
- 如果在 25 秒内没有收到消息,则完全停止收听
我似乎只能在文档中找到任何内容。也许这是不可能的?
以下是我启动订阅者的方式:
// Create a subscriber bound to the asynchronous message receiver
subscriber = Subscriber.newBuilder(projectSubscriptionName, new PubSubRoeMessageReceiver()).build();
// Start subscriber
subscriber.startAsync().awaitRunning();
// Allow the subscriber to run indefinitely unless an unrecoverable error occurs.
subscriber.awaitTerminated();
这就是我的消息接收者的样子:
public class PubSubRoeMessageReceiver implements MessageReceiver {
@Override
public void receiveMessage(PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) {
// Acknowledge message
System.out.println("Acknowledge message");
ackReplyConsumer.ack();
// TODO: stop the subscriber
// TODO: run task X
// TODO: start the subscriber
}
}
有什么想法吗?
以这种方式使用 Cloud Pub/Sub 是一种反模式,会导致问题。如果你在收到消息后立即确认消息,但在你处理它之前,如果订阅者因为某种原因崩溃了,你会怎么做? Pub/Sub 不会重新传送消息,因此可能永远不会处理它。
因此,您可能希望等到消息处理完毕后再进行确认。但是,您将无法关闭订阅者,因为消息未完成这一事实将会丢失,因此,ack 截止日期将过期并且消息将被重新传递。
如果你想确保客户端一次只接收一条消息,你可以使用 FlowControlSettings on the client. If you set MaxOutstandingElementCount 到 1,那么一次只会将一条消息传递给 receiveMessage
:
subscriber = Subscriber.newBuilder(projectSubscriptionName, new PubSubRoeMessageReceiver())
.setFlowControlSettings(FlowControlSettings.newBuilder()
.setMaxOutstandingRequestBytes(10L * 1024L * 1024L) // 10MB messages allowed.
.setMaxOutstandingElementCount(1L) // Only 1 outstanding message at a time.
.build())
.build();
请记住,如果您在启动订阅者时有大量积压的小消息,并且您打算启动多个订阅者,您可能 运行 进入低效的负载平衡,如documentation.
我正在使用 Google Pub/Sub Java SDK 订阅主题。我想要做的是:
- 开始听一个主题 X 秒(假设 25 秒)
- 如果收到消息,则停止收听并处理消息(这可能需要几分钟)
- 处理完消息后,继续监听主题 25 秒
- 如果在 25 秒内没有收到消息,则完全停止收听
我似乎只能在文档中找到任何内容。也许这是不可能的?
以下是我启动订阅者的方式:
// Create a subscriber bound to the asynchronous message receiver
subscriber = Subscriber.newBuilder(projectSubscriptionName, new PubSubRoeMessageReceiver()).build();
// Start subscriber
subscriber.startAsync().awaitRunning();
// Allow the subscriber to run indefinitely unless an unrecoverable error occurs.
subscriber.awaitTerminated();
这就是我的消息接收者的样子:
public class PubSubRoeMessageReceiver implements MessageReceiver {
@Override
public void receiveMessage(PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) {
// Acknowledge message
System.out.println("Acknowledge message");
ackReplyConsumer.ack();
// TODO: stop the subscriber
// TODO: run task X
// TODO: start the subscriber
}
}
有什么想法吗?
以这种方式使用 Cloud Pub/Sub 是一种反模式,会导致问题。如果你在收到消息后立即确认消息,但在你处理它之前,如果订阅者因为某种原因崩溃了,你会怎么做? Pub/Sub 不会重新传送消息,因此可能永远不会处理它。
因此,您可能希望等到消息处理完毕后再进行确认。但是,您将无法关闭订阅者,因为消息未完成这一事实将会丢失,因此,ack 截止日期将过期并且消息将被重新传递。
如果你想确保客户端一次只接收一条消息,你可以使用 FlowControlSettings on the client. If you set MaxOutstandingElementCount 到 1,那么一次只会将一条消息传递给 receiveMessage
:
subscriber = Subscriber.newBuilder(projectSubscriptionName, new PubSubRoeMessageReceiver())
.setFlowControlSettings(FlowControlSettings.newBuilder()
.setMaxOutstandingRequestBytes(10L * 1024L * 1024L) // 10MB messages allowed.
.setMaxOutstandingElementCount(1L) // Only 1 outstanding message at a time.
.build())
.build();
请记住,如果您在启动订阅者时有大量积压的小消息,并且您打算启动多个订阅者,您可能 运行 进入低效的负载平衡,如documentation.