在 long 运行 Cloud PubSub Subscriber 服务中捕获侦听器异常

Catching Listener Exceptions in long running Cloud PubSub Subscriber service

我正在尝试在 Java 中编写一个长 运行ning 订阅者服务。我已经设置了监听器来监听订阅者服务中的任何故障。我试图让这个容错,但我不太明白一些事情,下面是我的 doubts/questions.

  1. 我已遵循此处显示的基本设置 https://github.com/googleapis/google-cloud-java/blob/master/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java。具体来说,我设置了 addListener,如下所示。

如以下代码所示,initializeSubscriber 充当一个状态变量,它将确定订阅服务器服务是否应该重新启动。在 while 循环内,这个变量被持续监控以确定是否需要重新启动。

我的问题是, 1. 如何在 Subscriber.Listener 的失败方法中引发异常并在主 while 循环中捕获它。我尝试在失败的方法中抛出一个新的 Exception() 并在内部的 catch 块中捕获它,但是,我无法编译代码,因为它是一个已检查的异常。 2. 如这里所示,我使用 Java Executor thread to 运行 Listener。如何处理侦听器故障?我能否在此处所示的一般异常捕获块下捕获侦听器故障?

try {
 boolean initializeSubscriber = true;
    while (true) {
        try {
           if (initializeSubscriber) { 
             createSingleThreadedSubscriber();
             addErrorListenerToSubscriber();
             subscriber.startAsync().awaitRunning();
             initializeSubscriber = false;
           }

          // Checks the status of subscriber service every minute
          Thread.sleep(60000);

        } catch (Exception ex) {
          LOGGER.error("Could not start the Subscriber service", ex);
          cleanupSubscriber();
          initializeSubscriber = true;
        }
    }
} catch (RuntimeException e) {

} finally {
    shutdown();
}
private void addErrorListenerToSubscriber() {
    subscriber.addListener(
      new Subscriber.Listener() {
          @Override
          public void failed(Subscriber.State from, Throwable failure) throws RuntimeException { 
            LOGGER.info("Subscriber reached a failed state due to " + failure.getMessage()
                + ",Restarting Subscriber service");
            initializeSubscriber = true; 
          }
      },
      Executors.newSingleThreadExecutor());
  }

  private void cleanupSubscriber() {

    try {
      if (subscriber != null) {
        subscriber.stopAsync().awaitTerminated();
      }
      if (!subscriptionListener.isShutdown()) {
        subscriptionListener.shutdown();
      }
    } catch (Exception ex) {
      LOGGER.error("Error in cleaning up Subscriber thread " + ex);
    }
  }

如果您只想在失败时重新创建订阅者,则不必向订阅者添加侦听器。您可以改为在 awaitTerminated:

上捕获异常
try {
  boolean initializeSubscriber = true;
  while (initializeSubscriber) {
    try { 
      createSingleThreadedSubscriber();
      subscriber.startAsync().awaitRunning();
      initializeSubscriber = false;
      subscriber.awaitTerminated();
    } catch (Exception ex) {
      LOGGER.error("Error in the Subscriber service", ex);
      cleanupSubscriber();
      initializeSubscriber = true;
    }
  }
}  catch (RuntimeException e) {
} finally {
  shutdown();
}

如果订阅者因为调用 stopAsync 而成功关闭,那么 awaitTerminated 将不会抛出异常。如果有某种异常,那么 awaitTerminated 将抛出 IllegalStateException 因为状态将是 FAILED 而不是 TERMINATED.

请注意,暂时性错误由库本身处理。例如,如果服务器由于网络故障而暂时不可用,图书馆将无缝重新连接并继续传递消息。导致订阅者状态发生变化的失败可能是永久性失败,例如权限问题(其中帐户 运行 订阅者没有订阅订阅的权限)或资源问题(例如订阅已被删除)。在这些永久性故障情况下,重新创建订阅者可能只会导致相同的错误,除非采取手动步骤进行干预和解决问题。