在 long 运行 Cloud PubSub Subscriber 服务中捕获侦听器异常
Catching Listener Exceptions in long running Cloud PubSub Subscriber service
我正在尝试在 Java 中编写一个长 运行ning 订阅者服务。我已经设置了监听器来监听订阅者服务中的任何故障。我试图让这个容错,但我不太明白一些事情,下面是我的 doubts/questions.
- 我已遵循此处显示的基本设置 https://github.com/googleapis/google-cloud-java/blob/master/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java。具体来说,我设置了 addListener,如下所示。
如以下代码所示,initializeSubscriber 充当一个状态变量,它将确定订阅服务器服务是否应该重新启动。在 while 循环内,这个变量被持续监控以确定是否需要重新启动。
我的问题是,
1. 如何在 Subscriber.Listener 的失败方法中引发异常并在主 while 循环中捕获它。我尝试在失败的方法中抛出一个新的 Exception() 并在内部的 catch 块中捕获它,但是,我无法编译代码,因为它是一个已检查的异常。
2. 如这里所示,我使用 Java Executor thread to 运行 Listener。如何处理侦听器故障?我能否在此处所示的一般异常捕获块下捕获侦听器故障?
try {
boolean initializeSubscriber = true;
while (true) {
try {
if (initializeSubscriber) {
createSingleThreadedSubscriber();
addErrorListenerToSubscriber();
subscriber.startAsync().awaitRunning();
initializeSubscriber = false;
}
// Checks the status of subscriber service every minute
Thread.sleep(60000);
} catch (Exception ex) {
LOGGER.error("Could not start the Subscriber service", ex);
cleanupSubscriber();
initializeSubscriber = true;
}
}
} catch (RuntimeException e) {
} finally {
shutdown();
}
private void addErrorListenerToSubscriber() {
subscriber.addListener(
new Subscriber.Listener() {
@Override
public void failed(Subscriber.State from, Throwable failure) throws RuntimeException {
LOGGER.info("Subscriber reached a failed state due to " + failure.getMessage()
+ ",Restarting Subscriber service");
initializeSubscriber = true;
}
},
Executors.newSingleThreadExecutor());
}
private void cleanupSubscriber() {
try {
if (subscriber != null) {
subscriber.stopAsync().awaitTerminated();
}
if (!subscriptionListener.isShutdown()) {
subscriptionListener.shutdown();
}
} catch (Exception ex) {
LOGGER.error("Error in cleaning up Subscriber thread " + ex);
}
}
如果您只想在失败时重新创建订阅者,则不必向订阅者添加侦听器。您可以改为在 awaitTerminated
:
上捕获异常
try {
boolean initializeSubscriber = true;
while (initializeSubscriber) {
try {
createSingleThreadedSubscriber();
subscriber.startAsync().awaitRunning();
initializeSubscriber = false;
subscriber.awaitTerminated();
} catch (Exception ex) {
LOGGER.error("Error in the Subscriber service", ex);
cleanupSubscriber();
initializeSubscriber = true;
}
}
} catch (RuntimeException e) {
} finally {
shutdown();
}
如果订阅者因为调用 stopAsync
而成功关闭,那么 awaitTerminated
将不会抛出异常。如果有某种异常,那么 awaitTerminated
将抛出 IllegalStateException
因为状态将是 FAILED
而不是 TERMINATED
.
请注意,暂时性错误由库本身处理。例如,如果服务器由于网络故障而暂时不可用,图书馆将无缝重新连接并继续传递消息。导致订阅者状态发生变化的失败可能是永久性失败,例如权限问题(其中帐户 运行 订阅者没有订阅订阅的权限)或资源问题(例如订阅已被删除)。在这些永久性故障情况下,重新创建订阅者可能只会导致相同的错误,除非采取手动步骤进行干预和解决问题。
我正在尝试在 Java 中编写一个长 运行ning 订阅者服务。我已经设置了监听器来监听订阅者服务中的任何故障。我试图让这个容错,但我不太明白一些事情,下面是我的 doubts/questions.
- 我已遵循此处显示的基本设置 https://github.com/googleapis/google-cloud-java/blob/master/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java。具体来说,我设置了 addListener,如下所示。
如以下代码所示,initializeSubscriber 充当一个状态变量,它将确定订阅服务器服务是否应该重新启动。在 while 循环内,这个变量被持续监控以确定是否需要重新启动。
我的问题是, 1. 如何在 Subscriber.Listener 的失败方法中引发异常并在主 while 循环中捕获它。我尝试在失败的方法中抛出一个新的 Exception() 并在内部的 catch 块中捕获它,但是,我无法编译代码,因为它是一个已检查的异常。 2. 如这里所示,我使用 Java Executor thread to 运行 Listener。如何处理侦听器故障?我能否在此处所示的一般异常捕获块下捕获侦听器故障?
try {
boolean initializeSubscriber = true;
while (true) {
try {
if (initializeSubscriber) {
createSingleThreadedSubscriber();
addErrorListenerToSubscriber();
subscriber.startAsync().awaitRunning();
initializeSubscriber = false;
}
// Checks the status of subscriber service every minute
Thread.sleep(60000);
} catch (Exception ex) {
LOGGER.error("Could not start the Subscriber service", ex);
cleanupSubscriber();
initializeSubscriber = true;
}
}
} catch (RuntimeException e) {
} finally {
shutdown();
}
private void addErrorListenerToSubscriber() {
subscriber.addListener(
new Subscriber.Listener() {
@Override
public void failed(Subscriber.State from, Throwable failure) throws RuntimeException {
LOGGER.info("Subscriber reached a failed state due to " + failure.getMessage()
+ ",Restarting Subscriber service");
initializeSubscriber = true;
}
},
Executors.newSingleThreadExecutor());
}
private void cleanupSubscriber() {
try {
if (subscriber != null) {
subscriber.stopAsync().awaitTerminated();
}
if (!subscriptionListener.isShutdown()) {
subscriptionListener.shutdown();
}
} catch (Exception ex) {
LOGGER.error("Error in cleaning up Subscriber thread " + ex);
}
}
如果您只想在失败时重新创建订阅者,则不必向订阅者添加侦听器。您可以改为在 awaitTerminated
:
try {
boolean initializeSubscriber = true;
while (initializeSubscriber) {
try {
createSingleThreadedSubscriber();
subscriber.startAsync().awaitRunning();
initializeSubscriber = false;
subscriber.awaitTerminated();
} catch (Exception ex) {
LOGGER.error("Error in the Subscriber service", ex);
cleanupSubscriber();
initializeSubscriber = true;
}
}
} catch (RuntimeException e) {
} finally {
shutdown();
}
如果订阅者因为调用 stopAsync
而成功关闭,那么 awaitTerminated
将不会抛出异常。如果有某种异常,那么 awaitTerminated
将抛出 IllegalStateException
因为状态将是 FAILED
而不是 TERMINATED
.
请注意,暂时性错误由库本身处理。例如,如果服务器由于网络故障而暂时不可用,图书馆将无缝重新连接并继续传递消息。导致订阅者状态发生变化的失败可能是永久性失败,例如权限问题(其中帐户 运行 订阅者没有订阅订阅的权限)或资源问题(例如订阅已被删除)。在这些永久性故障情况下,重新创建订阅者可能只会导致相同的错误,除非采取手动步骤进行干预和解决问题。