检查 Azure 服务总线订户中的应用程序准备情况

Check Readiness of application in azure service bus subscriber

我有一个场景,我在我们的微服务基础项目中从 azure 服务总线事件侦听器调用多个 self api。毕竟我们的应用程序的端点和我们调用 self API 时都很好,但是每当我们在所有其余点准备就绪之前在应用程序启动时收到一个事件时,我就会 运行 有问题。由于端点未就绪,应用程序抛出 I/O 异常。 我想要一种机制的解决方案,我们可以在其中检查应用程序是否准备就绪。

请在下面找到代码片段

    void registerMessageHandlerOnClient(SubscriptionClient receiveClient) throws ServiceBusException, InterruptedException {

    log.debug("registerMessageHandlerOnClient method invoked");
    ExecutorService threadPool = Executors.newFixedThreadPool(binderPropertyConfig.getThreadpool());
    ISessionHandler sessionHandler = new ISessionHandler() {

        @Override
        public CompletableFuture<Void> onMessageAsync(IMessageSession session, IMessage message) {
            
            //I want some logic to wait for service to up 
            ...
            ...
            ...
            ...
        }

        @Override
        public void notifyException(Throwable exception, ExceptionPhase phase) {
            
        }

        @Override
        public CompletableFuture<Void> OnCloseSessionAsync(IMessageSession session) {
            return CompletableFuture.completedFuture(null);
        }
    };

    receiveClient.registerSessionHandler(sessionHandler, new 
    SessionHandlerOptions(binderPropertyConfig.getConcurrency(), false, 
    Duration.ofMinutes(binderPropertyConfig.getMaxAutoRenewDurationInMinutes())), threadPool);

 }

非常感谢任何帮助。

根据您在问题中添加的标签,我假设您正在使用 spring 引导。因此,我能想到的一个简单解决方案是将 'Liveness/Readiness Probes' 添加到所有目标微服务中。探测只是一个与服务一起添加的 http 端点,用于检查服务是否启动。您可以考虑使用 Spring Boot Actuator(例如“/actuator/health/liveness”和“/actuator/health/readiness”http 端点)。这里有一些不错的帖子。注意:这些 Liveness 和 Readiness 概念不仅适用于 Kubernetes,而且无论部署平台如何,它们通常都很有用。

一旦在目标服务中启用它,然后在源事件侦听器中,您可以在注册消息处理程序之前等待这些服务准备就绪。

void registerMessageHandlerOnClient(SubscriptionClient receiveClient) throws ServiceBusException, InterruptedException {

    log.debug("registerMessageHandlerOnClient method invoked");
    ExecutorService threadPool = Executors.newFixedThreadPool(binderPropertyConfig.getThreadpool());
    ISessionHandler sessionHandler = new ISessionHandler() {

        @Override
        public CompletableFuture<Void> onMessageAsync(IMessageSession session, IMessage message) {
            
            // make a REST (HTTP GET) call to "/actuator/health/liveness" endpoint of the target service(s) 
            // to check if up, if not, wait and repeat until up 
            // or give up after x retries with desired error handling logic
            ...
            ...
        }

        @Override
        public void notifyException(Throwable exception, ExceptionPhase phase) {
            
        }

        @Override
        public CompletableFuture<Void> OnCloseSessionAsync(IMessageSession session) {
            return CompletableFuture.completedFuture(null);
        }
    };

    // make a REST call (HTTP GET) to "/actuator/health/readiness" endpoint of the target service(s) 
    // to check if ready, if not, wait and repeat until ready 
    // or give up after x retries with desired error handling logic.

    receiveClient.registerSessionHandler(sessionHandler, new 
    SessionHandlerOptions(binderPropertyConfig.getConcurrency(), false, 
    Duration.ofMinutes(binderPropertyConfig.getMaxAutoRenewDurationInMinutes())), threadPool);

 }

对于这种情况,我有两种解决方案,或者我们可以调用来检查自助服务健康状况,即使用应用程序 actuator 公开的健康端点。 或者我们可以实现 [ApplicationListener][1],它有一个方法,一旦应用程序准备就绪就会调用该方法。

@Component
public class ApplicationStartup 
implements ApplicationListener<ApplicationReadyEvent> {

  /**
   * This event is executed as late as conceivably possible to indicate that 
   * the application is ready to service requests.
   */
  @Override
  public void onApplicationEvent(final ApplicationReadyEvent event) {

    // here your code ...

    return;
  }
}

应用程序启动后,onApplicationEvent() 将收到事件。所以我们可以有逻辑,我们可以设置一些标志,我们可以使用标志作为消费者的启用点。