i.grpc.internal.AbstractClientStream - 在关闭的流上接收到的数据意味着

i.grpc.internal.AbstractClientStream - Received data on closed stream meaning

我有一个 Spring 引导应用程序 (v2.2.10.RELEASE),它订阅了 pubSub 中的多个主题并提取异步数据并将其发送到其他地方。我没有使用 SpringGCP,只使用原生 google 库

这是我的订阅者设置:

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
            (PubsubMessage message, AckReplyConsumer consumer) -> {
                messages.add(message);
                consumer.ack();
            };

    Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver)
                                    .setParallelPullCount(2)
                                    .setFlowControlSettings(flowControlSettings)
                                    .setCredentialsProvider(credentialsProvider)
                                    .setExecutorProvider(executorProvider)
                                     //.setChannelProvider()
                                    .build();

在高流量和大消息 (2 - 4 kb) 的情况下,我遇到了这条信息消息:

[grpc-default-worker-ELG-1-1] INFO  i.grpc.internal.AbstractClientStream - Received data on closed stream

首先,我不完全明白那是什么意思?我注意到的是,当发生这种情况时,传递的重复消息会增加。所以我认为这意味着 pubSub 试图通过一些消息到达订阅者,但由于某种原因订阅者还没有准备好,所以 pubSub 将尝试再次传递消息。因此重复次数更多,对吗?

使用订阅者中的 TransportChannelProvider 可以解决这个问题吗?我对写得不好的文档的理解是,当当前使用的通道关闭时,这将创建一个新的传递通道,从而摆脱以前的日志消息。

如果是,如何定义频道目标字符串?我在哪里可以找到 mangagedChannel 的符合 NameResolver 的 URI。我的意思是这个片段:

    private TransportChannelProvider getChannelProvider() {
    ManagedChannel channel = ManagedChannelBuilder.forTarget(target).usePlaintext(true).build();
    return FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
}

我是 GCP 的新手,如果我的问题不够连贯,请见谅

使用自定义 TransportChannelProvider 无法解决此类问题。这更有可能是堆栈更深层的问题,例如,在 gRPC 级别。此类错误存在一些未解决的问题 [1, 2]

关于导致重复的原因,消息可能是通过已经关闭的流(与错误消息一致)传递的,因为它们被困在较低级别的缓冲区中gRPC 层,因此最终成为随后通过另一个流传递和处理的消息的副本。这可能是 large backlogs of small messages 文档中讨论的问题的一个版本。 Java 客户端库的 v1.109.0 中修复了此问题,因此如果您使用的是比该版本更早的版本,则值得更新。

如果重复仍然是个问题,最好 reach out to support 提供您的订阅名称和一些重复消息的消息 ID,以便他们可以查看这些消息的传递模式消息并进一步诊断这些重新投递是否意外。