GRPC with C# and streaming response from server results in error: 'Shutdown has already been called'

GRPC with C# and streaming response from server results in error: 'Shutdown has already been called'

我正在尝试通过 GRPC 和 C# 使用服务器流,我的客户端收到一个响应然后抛出错误:

Shutdown has already been called

  at Grpc.Core.Internal.CompletionQueueSafeHandle.BeginOp()
   at Grpc.Core.Internal.CallSafeHandle.StartReceiveMessage(IReceivedMessageCallback callback)
   at Grpc.Core.Internal.AsyncCallBase`2.ReadMessageInternalAsync()
   at Grpc.Core.Internal.ClientResponseStream`2.<MoveNext>d__5.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter`1.GetResult()
   at GRPCTransferClient.Program.<Test>d__1.MoveNext() in Program.cs:line 25

我尝试通过在服务器端和客户端设置延迟来限制响应,但我仍然遇到错误。但我注意到,如果我在尝试调用 MoveNext() 之前在客户端上延迟 1 秒,我什至不会得到第一个响应,它会立即抛出错误。

我的原型是这样的:

syntax = "proto3";

package Mega.SplitText;

service SplitText {
  rpc Split(Text) returns (stream Line) {}
}

message Text {
  string text = 1;
}

message Line {
  string line = 1;
}

服务器代码:

public class SplitTextServiceImpl : SplitTextBase
    {
        public SplitTextServiceImpl()
        {
        }

        public override async Task Split(Text request, IServerStreamWriter<Line> responseStream, ServerCallContext context)
        {
            foreach (string line in request.Text_.Split(';'))
            {
                await responseStream.WriteAsync(new Line { Line_ = line });                
                Console.WriteLine($"Sent {line}");
            }
        }
    }

客户代码:

class Program
    {
        static void Main(string[] args)
        {
            var program = new Program();
            program.Test();            
        }

        async void Test()
        {
            Channel channel = new Channel($"127.0.0.1:50051", ChannelCredentials.Insecure);
            var client = new SplitTextClient(channel);
            using (var response = client.Split(new Text { Text_ = "a;b;c;d;e" }))
            {
                while (await response.ResponseStream.MoveNext())
                {
                    Console.WriteLine(response.ResponseStream.Current);
                }
            }
            Console.ReadLine();
        }
    }

我从官方 GRPC 存储库的示例文件夹中复制了这个结构,在我的机器上运行良好,所以我不知道我在另一个项目中做错了什么。

项目都是.Net Core 2.0,但为了以防万一,我也尝试了.Net Framework 4.6.1。在这两种情况下,结果是相同的。

已解决,问题是我忘记等待客户端异步功能。解决方案:

static void Main(string[] args)
{
    var program = new Program();
    program.Test().Wait(); // Need to wait here
}

async Task Test()
{
    Channel channel = new Channel($"127.0.0.1:50051", ChannelCredentials.Insecure);
    var client = new SplitTextClient(channel);
    using (var response = client.Split(new Text { Text_ = "a;b;c;d;e" }))
    {
        await Task.Delay(1000);
        while (await response.ResponseStream.MoveNext())
        {
            Console.WriteLine(response.ResponseStream.Current);
        }
    }
    Console.ReadLine();
}