订阅 Google Pub/Sub 主题 X 秒,如果没有收到消息则停止

Subscribe to Google Pub/Sub Topic for X seconds and stop if no messages are received

我正在使用 Google Pub/Sub Java SDK 订阅主题。我想要做的是:

我似乎只能在文档中找到任何内容。也许这是不可能的?

以下是我启动订阅者的方式:

// Create a subscriber bound to the asynchronous message receiver
subscriber = Subscriber.newBuilder(projectSubscriptionName, new PubSubRoeMessageReceiver()).build();

// Start subscriber
subscriber.startAsync().awaitRunning();

// Allow the subscriber to run indefinitely unless an unrecoverable error occurs.
subscriber.awaitTerminated();

这就是我的消息接收者的样子:

public class PubSubRoeMessageReceiver implements MessageReceiver {
    @Override
    public void receiveMessage(PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) {
        // Acknowledge message
        System.out.println("Acknowledge message");
        ackReplyConsumer.ack();

        // TODO: stop the subscriber

        // TODO: run task X

        // TODO: start the subscriber
    }
}

有什么想法吗?

以这种方式使用 Cloud Pub/Sub 是一种反模式,会导致问题。如果你在收到消息后立即确认消息,但在你处理它之前,如果订阅者因为某种原因崩溃了,你会怎么做? Pub/Sub 不会重新传送消息,因此可能永远不会处理它。

因此,您可能希望等到消息处理完毕后再进行确认。但是,您将无法关闭订阅者,因为消息未完成这一事实将会丢失,因此,ack 截止日期将过期并且消息将被重新传递。

如果你想确保客户端一次只接收一条消息,你可以使用 FlowControlSettings on the client. If you set MaxOutstandingElementCount 到 1,那么一次只会将一条消息传递给 receiveMessage:

subscriber = Subscriber.newBuilder(projectSubscriptionName, new PubSubRoeMessageReceiver())
    .setFlowControlSettings(FlowControlSettings.newBuilder()
        .setMaxOutstandingRequestBytes(10L * 1024L * 1024L) // 10MB messages allowed.
        .setMaxOutstandingElementCount(1L) // Only 1 outstanding message at a time.
        .build())
    .build();

请记住,如果您在启动订阅者时有大量积压的小消息,并且您打算启动多个订阅者,您可能 运行 进入低效的负载平衡,如documentation.