如果从单独的线程调用,则 ResponseStream 和 RequestStream 永远不会继续
ResponseStream and RequestStream Never Continue if Called From Separate Threads
聊天示例 (.Net Framework 4.x) 包含交错读取和写入相同 AsyncDuplexStreamingCall
的代码。我更改了 .Net Core Greeter 示例以包含流式 RPC:
service GreetingService {
rpc Greeting(HelloRequest) returns (HelloResponse);
rpc StreamGreeting(stream HelloRequest) returns (stream HelloResponse);
}
然后我基本上复制了服务器的 echo 实现:
public override async Task StreamGreeting(/* ... */)
{
while (await requestStream.MoveNext(CancellationToken.None))
{
var request = requestStream.Current;
Console.WriteLine($"Stream Message: {request.Name}");
await responseStream.WriteAsync(new HelloResponse { Greeting = "Stream Hello " + request.Name });
}
Console.WriteLine("Stream completed.");
}
最后,我尝试从不同的线程读取和写入此流:
var callResult = client.StreamGreeting(new CallOptions());
// Or: Task.Run
new Thread(async () =>
{
await callResult.RequestStream.WriteAsync(request);
await callResult.RequestStream.WriteAsync(request);
await callResult.RequestStream.WriteAsync(request);
await callResult.RequestStream.WriteAsync(request);
await callResult.RequestStream.CompleteAsync();
}).Start();
new Thread(async () =>
{
while (await callResult.ResponseStream.MoveNext(CancellationToken.None))
{
Console.WriteLine(callResult.ResponseStream.Current.Greeting);
}
}).Start();
第一次调用 WriteAsync
成功(服务器记录它收到消息),但第二次调用从未成功 returns/continues。 ResponseStream.MoveNext
从未 returns/continues。 运行 无论是在主 task/thread 上都没有解决问题。 运行 两者在主 task/thread 上都有效,包括所有形式的交错调用。
我是不是做错了什么,或者这是一个限制?
我可以重现您描述的结果,但前提是在发出处理线程后立即包含 GreeterClient
示例中的行 channel.ShutdownAsync().Wait()
。
这会导致 InvalidOperationException
,因为在进行流式调用时客户端存根会关闭。
为确保服务器上处理所有四个调用,您需要等待所有流式传输请求和响应完成后再关闭:
var callResult = client.StreamGreeting(new CallOptions());
Task.WhenAll(new[]
{
Task.Run(async () =>
{
await callResult.RequestStream.WriteAsync(request);
await callResult.RequestStream.WriteAsync(request);
await callResult.RequestStream.WriteAsync(request);
await callResult.RequestStream.WriteAsync(request);
await callResult.RequestStream.CompleteAsync();
}),
Task.Run(async () =>
{
while (await callResult.ResponseStream.MoveNext(CancellationToken.None))
{
Console.WriteLine(callResult.ResponseStream.Current.Greeting);
}
})
}).Wait();
聊天示例 (.Net Framework 4.x) 包含交错读取和写入相同 AsyncDuplexStreamingCall
的代码。我更改了 .Net Core Greeter 示例以包含流式 RPC:
service GreetingService {
rpc Greeting(HelloRequest) returns (HelloResponse);
rpc StreamGreeting(stream HelloRequest) returns (stream HelloResponse);
}
然后我基本上复制了服务器的 echo 实现:
public override async Task StreamGreeting(/* ... */)
{
while (await requestStream.MoveNext(CancellationToken.None))
{
var request = requestStream.Current;
Console.WriteLine($"Stream Message: {request.Name}");
await responseStream.WriteAsync(new HelloResponse { Greeting = "Stream Hello " + request.Name });
}
Console.WriteLine("Stream completed.");
}
最后,我尝试从不同的线程读取和写入此流:
var callResult = client.StreamGreeting(new CallOptions());
// Or: Task.Run
new Thread(async () =>
{
await callResult.RequestStream.WriteAsync(request);
await callResult.RequestStream.WriteAsync(request);
await callResult.RequestStream.WriteAsync(request);
await callResult.RequestStream.WriteAsync(request);
await callResult.RequestStream.CompleteAsync();
}).Start();
new Thread(async () =>
{
while (await callResult.ResponseStream.MoveNext(CancellationToken.None))
{
Console.WriteLine(callResult.ResponseStream.Current.Greeting);
}
}).Start();
第一次调用 WriteAsync
成功(服务器记录它收到消息),但第二次调用从未成功 returns/continues。 ResponseStream.MoveNext
从未 returns/continues。 运行 无论是在主 task/thread 上都没有解决问题。 运行 两者在主 task/thread 上都有效,包括所有形式的交错调用。
我是不是做错了什么,或者这是一个限制?
我可以重现您描述的结果,但前提是在发出处理线程后立即包含 GreeterClient
示例中的行 channel.ShutdownAsync().Wait()
。
这会导致 InvalidOperationException
,因为在进行流式调用时客户端存根会关闭。
为确保服务器上处理所有四个调用,您需要等待所有流式传输请求和响应完成后再关闭:
var callResult = client.StreamGreeting(new CallOptions());
Task.WhenAll(new[]
{
Task.Run(async () =>
{
await callResult.RequestStream.WriteAsync(request);
await callResult.RequestStream.WriteAsync(request);
await callResult.RequestStream.WriteAsync(request);
await callResult.RequestStream.WriteAsync(request);
await callResult.RequestStream.CompleteAsync();
}),
Task.Run(async () =>
{
while (await callResult.ResponseStream.MoveNext(CancellationToken.None))
{
Console.WriteLine(callResult.ResponseStream.Current.Greeting);
}
})
}).Wait();