Google pubsub: pull java 示例客户端挂起

Google pubsub: pull java sample client hanging

我正在使用 google pubsub 从订阅的主题异步提取消息。

我正在使用下面的示例代码:

   public void startStreaming() {
        Subscriber subscriber = null;
        try {
            SubscriptionName subscription = SubscriptionName.of("topic-test",
                    "subscription-id-1234");
            ExecutorProvider executorProvider =
                    InstantiatingExecutorProvider.newBuilder()
                            .setExecutorThreadCount(1)
                            .build();

            MessageReceiver receiver =
                    new MessageReceiver() {
                        @Override
                        public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
                            try {
                                String msg = message.getData().toStringUtf8();
                                System.out.println(msg);
                            } catch (Exception e) {
                                throw new RuntimeException("Failed to pull messages from topic");
                            }
                            consumer.ack();
                        }
                    };
            subscriber = Subscriber
                    .newBuilder(subscription, receiver)
                    .setExecutorProvider(executorProvider)
                    .build();
            subscriber.addListener(
                    new Subscriber.Listener() {
                        @Override
                        public void failed(Subscriber.State from, Throwable failure) {
                            // Handle failure. This is called when the Subscriber encountered a fatal error and is shutting down.
                            logger.error("Subscriber is not able to hook on google pubsub topic", failure);
                        }
                    },
                    MoreExecutors.directExecutor());
            subscriber.startAsync().awaitRunning();
            // why should I put sleep here??
            Thread.sleep(60000);
        } catch (Exception e) {
        } finally {
            if (subscriber != null) {
                subscriber.stopAsync().awaitTerminated();
            }
        }
    }

它在几分钟内工作正常(接收消息)并卡住,不再接收消息,我确信有消息,因为一旦我重新启动拉取客户端,它就会正常读取数据。

Nov 12, 2017 7:33:17 PM com.google.cloud.pubsub.v1.StreamingSubscriberConnection sendAckOperations
WARNING: failed to send acks
java.lang.IllegalStateException: call was cancelled
    at com.google.common.base.Preconditions.checkState(Preconditions.java:444)
    at io.grpc.internal.ClientCallImpl.sendMessage(ClientCallImpl.java:405)
    at io.grpc.ForwardingClientCall.sendMessage(ForwardingClientCall.java:52)
    at io.grpc.ForwardingClientCall.sendMessage(ForwardingClientCall.java:52)
    at io.grpc.stub.ClientCalls$CallToStreamObserverAdapter.onNext(ClientCalls.java:286)
    at com.google.cloud.pubsub.v1.StreamingSubscriberConnection.sendAckOperations(StreamingSubscriberConnection.java:274)
    at com.google.cloud.pubsub.v1.MessageDispatcher.processOutstandingAckOperations(MessageDispatcher.java:602)
    at com.google.cloud.pubsub.v1.MessageDispatcher.access00(MessageDispatcher.java:56)
    at com.google.cloud.pubsub.v1.MessageDispatcher$AckDeadlineAlarm.run(MessageDispatcher.java:529)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

当不使用 ExecutorProvider executorProvider 时,我得到以下日志

2017-11-12 19:08:43 [grpc-default-worker-ELG-2-3] DEBUG io.grpc.netty.NettyClientHandler [id: 0xdc0e7c4d, L:/10.8.17.4:47347 - R:pubsub.googleapis.com/1.1.1.1:111] OUTBOUND WINDOW_UPDATE: streamId=3 windowSizeIncrement=1048576
2017-11-12 19:08:43 [grpc-default-worker-ELG-2-4] DEBUG io.grpc.netty.NettyClientHandler [id: 0x330498ce, L:/10.8.17.4:47346 - R:pubsub.googleapis.com/1.1.1.1:111] OUTBOUND GO_AWAY: lastStreamId=0 errorCode=0 length=0 bytes=
2017-11-12 19:08:43 [grpc-default-worker-ELG-2-3] DEBUG io.grpc.netty.NettyClientHandler [id: 0xdc0e7c4d, L:/10.8.17.4:47347 - R:pubsub.googleapis.com/1.1.1.1:111] OUTBOUND GO_AWAY: lastStreamId=0 errorCode=0 length=0 bytes=
2017-11-12 19:08:44 [threadDeathWatcher-1-1] DEBUG io.netty.buffer.PoolThreadCache Freed 3 thread-local buffer(s) from thread: threadDeathWatcher-1-1
2017-11-12 19:08:44 [threadDeathWatcher-1-1] DEBUG io.netty.buffer.PoolThreadCache Freed 1 thread-local buffer(s) from thread: threadDeathWatcher-1-1
2017-11-12 19:08:44 [grpc-default-worker-ELG-2-3] DEBUG io.netty.buffer.PoolThreadCache Freed 153 thread-local buffer(s) from thread: grpc-default-worker-ELG-2-3
2017-11-12 19:08:44 [grpc-default-worker-ELG-2-4] DEBUG io.netty.buffer.PoolThreadCache Freed 131 thread-local buffer(s) from thread: grpc-default-worker-ELG-2-4

你能帮忙吗?

它现在工作正常,是删除下面的

lines, 

//            if (subscriber != null) {
//                subscriber.stopAsync().awaitTerminated();
//            }

解释,就是这个githubthread 1827

这确实有道理,缺少的是 Google 中的更多文档-cloud 文档提供了有关 github 线程中提供的评论的更多详细信息。

编辑: Github 创建工单以添加更多文档,并在下方关闭: 2616