如何通过 Java 中的 MessageReceiver 以编程方式停止和启动 PubSub
How to stop and start a PubSub programatically through the MessageReceiver in Java
是否可以使用 Java 以编程方式暂停和启动 GCP PubSub Subscriber(pull)
?
我有以下 MessageReceiver
代码:
MessageReceiver receiver = (message, consumer) -> {
System.out.println("Received message: " + message.getData().toStringUtf8() + " at " + LocalDateTime.now());
Mono<Void> mono = Mono.just(message.getData().toStringUtf8())
.flatMap(deserializePayload())
.doOnSuccess(creditNoteRequestDTO -> configureMDC(message.getAttributesMap(), creditNoteRequestDTO.getOrderNumber()))
.doOnError(error -> log.error("Problem while parsing CreditNoteRequest DTO {}", message.getData().toStringUtf8(), error))
.map(creditNoteRequestMapper::mapToCreditNoteRequest)
.flatMap(creditNoteRequestProcess::process)
.doOnError(throwable -> {
if (throwable instanceof CallNotPermittedException) ConstantUtils.ENABLED_SUBSCRIBER = false;
consumer.nack();
})
.doOnSuccess(unused -> {
log.info("Doing ACK on payload {}", message.getData().toStringUtf8());
consumer.ack();
});
if (Boolean.TRUE.equals(ConstantUtils.ENABLED_SUBSCRIBER)) {
mono.subscribe();
} else {
System.err.println("Subscription has been disabled.");
//mono.subscribe().dispose();
try {
System.err.println("PRE: " + LocalDateTime.now() + " " + ConstantUtils.ENABLED_SUBSCRIBER);
TimeUnit.SECONDS.sleep(15);
ConstantUtils.ENABLED_SUBSCRIBER = true;
System.err.println("POST: " + LocalDateTime.now() + " " + ConstantUtils.ENABLED_SUBSCRIBER);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
以及 subscriber
的以下内容:
Subscriber subscriber = null;
FlowControlSettings controlSettings = FlowControlSettings
.newBuilder()
.setMaxOutstandingElementCount(2000L)
.build();
subscriber = Subscriber
.newBuilder(subscription, receiver)
.setFlowControlSettings(controlSettings)
.build();
// Start the subscriber.
subscriber.startAsync().awaitRunning();
System.out.printf("Listening for messages on %s:\n", subscription);
System.out.printf("Max elements count: %s\n", subscriber.getFlowControlSettings().getMaxOutstandingElementCount());
// Allow the subscriber to run for 45s unless an unrecoverable error occurs.
Subscriber finalSubscriber = subscriber;
new java.util.Timer().schedule(
new java.util.TimerTask() {
@Override
public void run() {
System.err.println("Subscriber has been shut down.");
finalSubscriber.stopAsync();
}
},
120000
);
可以在 MessageReceiver
上使用 finalSubscriber.stopAsync();
或暂停 messageReceiver
?
更新:谢谢,我设法阻止了它,但如何重新启动它?现在,当我尝试重新启动时,没有任何反应。
private void createSubscriber() {
ProjectSubscriptionName subscription = ProjectSubscriptionName.of("txd-boss-dev", "circuit-breaker-test-sub");
this.subscriber = Subscriber.newBuilder(subscription, getMessageReceiver()).build();
}
private void runSubscriber(boolean start) {
if(start) {
try {
this.subscriber.startAsync().awaitRunning();
System.out.printf("Listening for messages on %s:\n", this.subscriber.getSubscriptionNameString());
subscriber.awaitTerminated(240, TimeUnit.SECONDS);
} catch (TimeoutException e) {
System.err.println("Shutting down subscription.");
runSubscriber(false);
}
} else {
subscriber.stopAsync();
}
}
@EventListener(ContextRefreshedEvent.class)
public void test() {
createSubscriber();
runSubscriber(true);
}
您需要return同一个订阅者对象来启动和停止它:
检查一些 google 个示例 here。
这是草图(适合您的class):
private Subscriber StartSubscriber() {
Subscriber subscriber = null;
try {
subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
subscriber.startAsync().awaitRunning();
System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
subscriber.awaitTerminated(30, TimeUnit.SECONDS);
return subscriber;
} catch (TimeoutException timeoutException) {
// Shut down the subscriber after 30s. Stop receiving messages.
System.out.printf("Error \n");
subscriber.stopAsync();
}
}
private void StopSubscriber(Subscriber subscriber) {
System.out.printf("Stoping subscriber\n");
subscriber.stopAsync();
}
public void test() {
Subscriber subscriber = StartSubscriber();
StopSubscriber(subscriber);
}
是否可以使用 Java 以编程方式暂停和启动 GCP PubSub Subscriber(pull)
?
我有以下 MessageReceiver
代码:
MessageReceiver receiver = (message, consumer) -> {
System.out.println("Received message: " + message.getData().toStringUtf8() + " at " + LocalDateTime.now());
Mono<Void> mono = Mono.just(message.getData().toStringUtf8())
.flatMap(deserializePayload())
.doOnSuccess(creditNoteRequestDTO -> configureMDC(message.getAttributesMap(), creditNoteRequestDTO.getOrderNumber()))
.doOnError(error -> log.error("Problem while parsing CreditNoteRequest DTO {}", message.getData().toStringUtf8(), error))
.map(creditNoteRequestMapper::mapToCreditNoteRequest)
.flatMap(creditNoteRequestProcess::process)
.doOnError(throwable -> {
if (throwable instanceof CallNotPermittedException) ConstantUtils.ENABLED_SUBSCRIBER = false;
consumer.nack();
})
.doOnSuccess(unused -> {
log.info("Doing ACK on payload {}", message.getData().toStringUtf8());
consumer.ack();
});
if (Boolean.TRUE.equals(ConstantUtils.ENABLED_SUBSCRIBER)) {
mono.subscribe();
} else {
System.err.println("Subscription has been disabled.");
//mono.subscribe().dispose();
try {
System.err.println("PRE: " + LocalDateTime.now() + " " + ConstantUtils.ENABLED_SUBSCRIBER);
TimeUnit.SECONDS.sleep(15);
ConstantUtils.ENABLED_SUBSCRIBER = true;
System.err.println("POST: " + LocalDateTime.now() + " " + ConstantUtils.ENABLED_SUBSCRIBER);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
以及 subscriber
的以下内容:
Subscriber subscriber = null;
FlowControlSettings controlSettings = FlowControlSettings
.newBuilder()
.setMaxOutstandingElementCount(2000L)
.build();
subscriber = Subscriber
.newBuilder(subscription, receiver)
.setFlowControlSettings(controlSettings)
.build();
// Start the subscriber.
subscriber.startAsync().awaitRunning();
System.out.printf("Listening for messages on %s:\n", subscription);
System.out.printf("Max elements count: %s\n", subscriber.getFlowControlSettings().getMaxOutstandingElementCount());
// Allow the subscriber to run for 45s unless an unrecoverable error occurs.
Subscriber finalSubscriber = subscriber;
new java.util.Timer().schedule(
new java.util.TimerTask() {
@Override
public void run() {
System.err.println("Subscriber has been shut down.");
finalSubscriber.stopAsync();
}
},
120000
);
可以在 MessageReceiver
上使用 finalSubscriber.stopAsync();
或暂停 messageReceiver
?
更新:谢谢,我设法阻止了它,但如何重新启动它?现在,当我尝试重新启动时,没有任何反应。
private void createSubscriber() {
ProjectSubscriptionName subscription = ProjectSubscriptionName.of("txd-boss-dev", "circuit-breaker-test-sub");
this.subscriber = Subscriber.newBuilder(subscription, getMessageReceiver()).build();
}
private void runSubscriber(boolean start) {
if(start) {
try {
this.subscriber.startAsync().awaitRunning();
System.out.printf("Listening for messages on %s:\n", this.subscriber.getSubscriptionNameString());
subscriber.awaitTerminated(240, TimeUnit.SECONDS);
} catch (TimeoutException e) {
System.err.println("Shutting down subscription.");
runSubscriber(false);
}
} else {
subscriber.stopAsync();
}
}
@EventListener(ContextRefreshedEvent.class)
public void test() {
createSubscriber();
runSubscriber(true);
}
您需要return同一个订阅者对象来启动和停止它:
检查一些 google 个示例 here。
这是草图(适合您的class):
private Subscriber StartSubscriber() {
Subscriber subscriber = null;
try {
subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
subscriber.startAsync().awaitRunning();
System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
subscriber.awaitTerminated(30, TimeUnit.SECONDS);
return subscriber;
} catch (TimeoutException timeoutException) {
// Shut down the subscriber after 30s. Stop receiving messages.
System.out.printf("Error \n");
subscriber.stopAsync();
}
}
private void StopSubscriber(Subscriber subscriber) {
System.out.printf("Stoping subscriber\n");
subscriber.stopAsync();
}
public void test() {
Subscriber subscriber = StartSubscriber();
StopSubscriber(subscriber);
}