grpc 流式传输时定期重置连接

Periodic connection reset while grpc streaming

服务器:

override fun subscribe(request: Subscribe, responseObserver: StreamObserver<SubscriptionEvent>) {
        sessionStore.grpcHandler(responseObserver, request.sessionId) { session ->
            eventStream.stream(session.id)
                .doOnNext {
                    try {
                        if ((responseObserver as ServerCallStreamObserver).isCancelled) {
                            log.debug { "Stopping to stream events, seems like client cancelled it" }
                            responseObserver.onCompleted()
                            return@doOnNext
                        }

                        responseObserver.onNext(it)
                    } catch (e: StatusRuntimeException) {
                        log.error("Could not stream an event", e)
                    }
                }
                .doOnError { throwable ->
                    log.error("Subscription failed", throwable)
                }
                .subscribe()
        }
    }

客户:

fun subscribe(sessionId: String, tenantId: String, botId: String) {
        subscriptionsThreadPool.submit {
            try {
                subscriptionService.withDeadlineAfter(Long.MAX_VALUE, TimeUnit.SECONDS).subscribe(
                    Subscribe.newBuilder().setSessionId(sessionId).build(),
                    SubscribeStreamObserver(sessionId, tenantId, botId)
                )

                finishLatch.await()
            } catch (e: Throwable) {
                log.error("Could not subscribe to connector-service", e)
            }
        }
    }

服务器正在使用 https://github.com/LogNet/grpc-spring-boot-starter

客户端的netty配置(值得一提的是grpc服务器前面没有任何代理):

private fun rpcChannel(): ManagedChannel =
        NettyChannelBuilder
            .forTarget(properties.connectorServiceUrl)
            .usePlaintext()
            .build()

一旦我开始客户端订阅(即调用流式传输事件的订阅方法),它最多需要 4 分钟才能失败并出现 UNAVAILABE Connection reset 异常。总是大约 3-4 分钟。我确实尝试设置所有可能的 netty 配置属性,但没有任何帮助。这是日志..

服务器:

2021-07-06 11:56:26.045 DEBUG [/] [-worker-ELG-3-1] io.grpc.netty.NettyServerHandler         : Connection Error

java.io.IOException: Connection reset by peer
    at java.base/sun.nio.ch.FileDispatcherImpl.read0(Native Method)
    at java.base/sun.nio.ch.SocketDispatcher.read(Unknown Source)
    at java.base/sun.nio.ch.IOUtil.readIntoNativeBuffer(Unknown Source)
    at java.base/sun.nio.ch.IOUtil.read(Unknown Source)
    at java.base/sun.nio.ch.IOUtil.read(Unknown Source)
    at java.base/sun.nio.ch.SocketChannelImpl.read(Unknown Source)
    at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
    at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Unknown Source)

2021-07-06 11:56:26.045 DEBUG [/] [-worker-ELG-3-1] io.grpc.netty.NettyServerHandler         : [id: 0x1f9596dc, L:/172.17.0.110:8081 - R:/46.5.255.46:58262] OUTBOUND GO_AWAY: lastStreamId=2147483647 errorCode=2 length=24 bytes=436f6e6e656374696f6e2072657365742062792070656572
2021-07-06 11:56:26.046 DEBUG [/] [-worker-ELG-3-1] i.g.n.NettyServerTransport.connections   : Transport failed

客户:

2021-07-06 13:56:25.996 DEBUG [/] [-worker-ELG-1-1] io.grpc.netty.NettyClientHandler         : Caught a connection error

java.net.SocketException: Connection reset
    at java.base/sun.nio.ch.SocketChannelImpl.throwConnectionReset(SocketChannelImpl.java:367)
    at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:398)
    at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133)
    at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:832)

2021-07-06 13:56:26.008 DEBUG [/] [-worker-ELG-1-1] io.grpc.netty.NettyClientHandler         : [id: 0x52cea98f, L:/192.168.178.20:57940 - R:/116.202.155.130:30192] OUTBOUND GO_AWAY: lastStreamId=0 errorCode=2 length=16 bytes=436f6e6e656374696f6e207265736574
2021-07-06 13:56:26.013 DEBUG [/] [-worker-ELG-1-1] io.grpc.netty.NettyClientHandler         : Network channel is closed

io.grpc 版本为 1.37.0

有什么想法吗?

听起来网络路径上的某个设备在闲置一段时间后正在终止连接。它可以是代理、NAT 或防火墙。

如果您能够找到该设备,您或许可以对其进行配置。但是通常你不能很好地配置这些东西。

gRPC 支持专为这种情况设计的 Keep Alive。在activity 一段时间后,grpc 将导致 activity,只是为了确保连接仍然良好并通知网络设备连接仍在使用中。

您可以在客户端或服务器端配置保活。如果网络设备是服务器部署的一部分,最好让服务器管理 keepalive。由于客户端有一个不可路由的 IP,我预计问题是这种情况是客户端前面的 NAT。所以在客户端配置keepalive更有意义,因为不同的客户端可能有不同的需求。

        NettyChannelBuilder
            .forTarget(properties.connectorServiceUrl)
            .usePlaintext()
            // Enable keepalive, with a time a bit smaller
            // than the observed resets
            .keepAliveTime(150, TimeUnit.SECONDS)
            .build()

为防止滥用,gRPC 服务器默认将 keepalive 限制为不少于 5 分钟。所以你还需要改变你的服务器。我没有使用 grpc-spring-boot-starter,但根据他们的文档,您似乎会使用:

@Component
public class MyGRpcServerBuilderConfigurer extends GRpcServerBuilderConfigurer{
        @Override
        public void configure(ServerBuilder<?> serverBuilder) {
            ((NettyServerBuilder) serverBuilder)
                .permitKeepAliveTime(150, TimeUnit.SECONDS);
        }
    };
}