使用异步拉取不断从 Google PubSub 接收消息

Receive message from Google PubSub continuously using Asynchronous Pull

在这个 link 的帮助下,我成功地创建了一个小型 Java 应用程序,可以提取已发布的消息一分钟。我的实现看起来像这样。

    public static void eventListener() throws InterruptedException {

    MessageReceiver receiver = new MessageReceiver() {
      @Override
      public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
        System.out.println("Received message: " + message.getData().toStringUtf8());
        consumer.ack();
      }
    };
    //Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscription, receiver)
          .setCredentialsProvider(FixedCredentialsProvider.create(creds)).build();
      subscriber.addListener(new Subscriber.Listener() {
        @Override
        public void failed(Subscriber.State from, Throwable failure) {
          // Handle failure. This is called when the Subscriber encountered a fatal error
          // and is
          // shutting down.
          System.err.println(failure);
        }
      }, MoreExecutors.directExecutor());
      subscriber.startAsync().awaitRunning();

      // In this example, we will pull messages for one minute (60,000ms) then stop.
      // In a real application, this sleep-then-stop is not necessary.
      // Simply call stopAsync().awaitTerminated() when the server is shutting down,
      // etc.
      Thread.sleep(60000);
    } finally {
      if (subscriber != null) {
        subscriber.stopAsync().awaitTerminated();
      }
    }
  }

当我在main中调用这个方法时

public static void main(String[] args) throws InterruptedException {
  eventListener();

}

并将对象上传到我的 Google 云存储,程序会打印发布者发送的消息,如下所示

Received message: {
  "kind": "storage#object",
  "id": "roshanbucket/stones.jpg/1553765105996166",
  "selfLink": "https://www.googleapis.com/storage/v1/b/roshanbucket/o/stones.jpg",
  "name": "stones.jpg",
  "bucket": "roshanbucket",
  "generation": "1553765105996166",
  "metageneration": "1",
  "contentType": "image/jpeg",
  "timeCreated": "2019-03-28T09:25:05.995Z",
  "updated": "2019-03-28T09:25:05.995Z",
  "storageClass": "STANDARD",
  "timeStorageClassUpdated": "2019-03-28T09:25:05.995Z",
  "size": "137256",
  "md5Hash": "1GmpUnGeiW+/KU+0U8c8Wg==",
  "mediaLink": "https://www.googleapis.com/download/storage/v1/b/roshanbucket/o/stones.jpg?generation=1553765105996166&alt=media",
  "crc32c": "FMaEGg==",
  "etag": "CIaj1InCpOECEAE="
}

程序执行一分钟后,它会打印对象上传帐户收到的所有消息,然后停止。要在一分钟后收到事件消息,我需要重新启动应用程序。现在,我想做的是连续 运行 监听器,所以,我尝试 运行 方法 eventListener() 在 main 方法的无限循环中,像这样

    public static void main(String[] args) throws InterruptedException {
    while(true) {
      eventListener();
    }
  }

有了这个,我似乎能够在每次上传后立即收到事件消息,无论我何时上传对象。但是,每隔一段时间,它就会抛出这个堆栈跟踪。

    Mar 28, 2019 12:56:34 PM io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference cleanQueue
SEVERE: *~*~*~ Channel ManagedChannelImpl{logId=6, target=pubsub.googleapis.com:443} was not shutdown properly!!! ~*~*~*
    Make sure to call shutdown()/shutdownNow() and wait until awaitTermination() returns true.
java.lang.RuntimeException: ManagedChannel allocation site
    at io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:103)
    at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:53)
    at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:44)
    at io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:440)
    at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createSingleChannel(InstantiatingGrpcChannelProvider.java:223)
    at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createChannel(InstantiatingGrpcChannelProvider.java:164)
    at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.getTransportChannel(InstantiatingGrpcChannelProvider.java:156)
    at com.google.api.gax.rpc.ClientContext.create(ClientContext.java:157)
    at com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub.create(GrpcSubscriberStub.java:260)
    at com.google.cloud.pubsub.v1.Subscriber.doStart(Subscriber.java:268)
    at com.google.api.core.AbstractApiService$InnerService.doStart(AbstractApiService.java:148)
    at com.google.common.util.concurrent.AbstractService.startAsync(AbstractService.java:225)
    at com.google.api.core.AbstractApiService.startAsync(AbstractApiService.java:120)
    at com.google.cloud.pubsub.v1.Subscriber.startAsync(Subscriber.java:260)
    at listener.AsynchronousPull.eventListener(AsynchronousPull.java:57)
    at listener.AsynchronousPull.main(AsynchronousPull.java:74)

但是,它仍然在每次上传后打印消息,同时不时抛出堆栈跟踪。我对 threads 没有太多经验,非常感谢您帮助解决这个问题。

在一个紧密的循环中调用 eventListener() 并不是您想在这里做的。这将创建许多订阅者的新实例,这些订阅者会接收每条存活 60 秒的消息。您想要的是让您创建的订阅者的单个实例一直存在到您想要关闭它的时间。通常,您可以通过创建订阅者并通过 awaitTerminated().

等待其终止来完成此操作

上面的代码将被修改成这样:

public static void eventListener() throws InterruptedException {
  MessageReceiver receiver = new MessageReceiver() {
    @Override
    public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
      System.out.println("Received message: " + message.getData().toStringUtf8());
      consumer.ack();
    }
  };
  Subscriber subscriber = null;
  try {
    subscriber = Subscriber.newBuilder(subscription, receiver)
        .setCredentialsProvider(FixedCredentialsProvider.create(creds)).build();
    subscriber.addListener(new Subscriber.Listener() {
      @Override
      public void failed(Subscriber.State from, Throwable failure) {
        // Handle failure. This is called when the Subscriber encountered a fatal error
        // and is
        // shutting down.
        System.err.println(failure);
      }
    }, MoreExecutors.directExecutor());
    subscriber.startAsync().awaitRunning();
    subscriber.awaitTerminated();
  } finally {
    if (subscriber != null) {
      subscriber.stopAsync().awaitTerminated();
    }
  }
}

public static void main(String[] args) throws InterruptedException {
  eventListener();
}

如果您只是希望订阅者在应用程序终止时停止并且不想进行任何额外的清理,那么上面的代码将起作用,允许订阅者 运行 并接收消息直到发生错误或应用程序关闭。如果你想对应用程序的干净终止做一些清理,例如,你想确保任何消息已经被 receiveMessage 运行 完成处理,那么你可以 attach a shutdown hook 来捕获这样的终止(尽管它不会 运行 在所有情况下)。在这个钩子中,你会调用 stopAsync()。例如,您可以在 try 块之前插入以下内容:

Runtime.getRuntime().addShutdownHook(new Thread() {
    public void run() {
      subscriber.stopAsync().awaitTerminated();
    }
});