如何通过 Java 中的 MessageReceiver 以编程方式停止和启动 PubSub

How to stop and start a PubSub programatically through the MessageReceiver in Java

是否可以使用 Java 以编程方式暂停和启动 GCP PubSub Subscriber(pull)

我有以下 MessageReceiver 代码:

MessageReceiver receiver = (message, consumer) -> {
            System.out.println("Received message: " + message.getData().toStringUtf8() + " at " + LocalDateTime.now());

            Mono<Void> mono = Mono.just(message.getData().toStringUtf8())
                    .flatMap(deserializePayload())
                    .doOnSuccess(creditNoteRequestDTO -> configureMDC(message.getAttributesMap(), creditNoteRequestDTO.getOrderNumber()))
                    .doOnError(error -> log.error("Problem while parsing CreditNoteRequest DTO {}", message.getData().toStringUtf8(), error))
                    .map(creditNoteRequestMapper::mapToCreditNoteRequest)
                    .flatMap(creditNoteRequestProcess::process)
                    .doOnError(throwable -> {
                        if (throwable instanceof CallNotPermittedException) ConstantUtils.ENABLED_SUBSCRIBER = false;
                        consumer.nack();
                    })
                    .doOnSuccess(unused -> {
                        log.info("Doing ACK on payload {}", message.getData().toStringUtf8());
                        consumer.ack();
                    });

            if (Boolean.TRUE.equals(ConstantUtils.ENABLED_SUBSCRIBER)) {
                mono.subscribe();
            } else {
                System.err.println("Subscription has been disabled.");
                //mono.subscribe().dispose();
                try {
                    System.err.println("PRE: " + LocalDateTime.now() + " " + ConstantUtils.ENABLED_SUBSCRIBER);
                    TimeUnit.SECONDS.sleep(15);
                    ConstantUtils.ENABLED_SUBSCRIBER = true;
                    System.err.println("POST: " + LocalDateTime.now() + " " + ConstantUtils.ENABLED_SUBSCRIBER);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        };

以及 subscriber 的以下内容:

Subscriber subscriber = null;
        FlowControlSettings controlSettings = FlowControlSettings
                .newBuilder()
                .setMaxOutstandingElementCount(2000L)
                .build();

        subscriber = Subscriber
                .newBuilder(subscription, receiver)
                .setFlowControlSettings(controlSettings)
                .build();

        // Start the subscriber.
        subscriber.startAsync().awaitRunning();
        System.out.printf("Listening for messages on %s:\n", subscription);
        System.out.printf("Max elements count: %s\n", subscriber.getFlowControlSettings().getMaxOutstandingElementCount());
        // Allow the subscriber to run for 45s unless an unrecoverable error occurs.

        Subscriber finalSubscriber = subscriber;
        new java.util.Timer().schedule(
                new java.util.TimerTask() {
                    @Override
                    public void run() {
                        System.err.println("Subscriber has been shut down.");
                        finalSubscriber.stopAsync();
                    }
                },
                120000
        );

可以在 MessageReceiver 上使用 finalSubscriber.stopAsync(); 或暂停 messageReceiver?

更新:谢谢,我设法阻止了它,但如何重新启动它?现在,当我尝试重新启动时,没有任何反应。

private void createSubscriber() {
        ProjectSubscriptionName subscription = ProjectSubscriptionName.of("txd-boss-dev", "circuit-breaker-test-sub");
        this.subscriber = Subscriber.newBuilder(subscription, getMessageReceiver()).build();
    }

    private void runSubscriber(boolean start) {
        if(start) {

            try {
                this.subscriber.startAsync().awaitRunning();
                System.out.printf("Listening for messages on %s:\n", this.subscriber.getSubscriptionNameString());
                subscriber.awaitTerminated(240, TimeUnit.SECONDS);
            } catch (TimeoutException e) {
                System.err.println("Shutting down subscription.");
                runSubscriber(false);
            }

        } else {
            subscriber.stopAsync();
        }
    }

    @EventListener(ContextRefreshedEvent.class)
    public void test() {
        createSubscriber();
        runSubscriber(true);
    }

您需要return同一个订阅者对象来启动和停止它:

检查一些 google 个示例 here

这是草图(适合您的class):

private Subscriber StartSubscriber() {

    Subscriber subscriber = null;
    try {
        subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
        subscriber.startAsync().awaitRunning();
        System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
        subscriber.awaitTerminated(30, TimeUnit.SECONDS);
        return subscriber;
    } catch (TimeoutException timeoutException) {
        // Shut down the subscriber after 30s. Stop receiving messages.
        System.out.printf("Error \n");
        subscriber.stopAsync();
    }

}

private void StopSubscriber(Subscriber subscriber) {

    System.out.printf("Stoping subscriber\n");
    subscriber.stopAsync();
}


public void test() {
    Subscriber subscriber = StartSubscriber();
    StopSubscriber(subscriber);
}