gRPC 保持流活动
gRPC Keep stream alive
我正在尝试使用 gRPC 进行长期流式传输会话,因为我需要保证从服务器到客户端的消息顺序。
我有以下 .proto:
service Subscriber {
rpc Subscribe(SubscriptionRequest) returns (stream SubscriberEvent);
}
我当前的服务(托管在 ASP.NET / .NET 5.0 中)如下所示:
public class SubscriberService : Subscriber.SubscriberBase
{
private readonly ILogger<SubscriberService> _logger;
private readonly ConcurrentDictionary<string, IServerStreamWriter<SubscriberEvent>> _subscriptions = new();
private int _messageCount = 0;
private Timer _timer;
public SubscriberService(ILogger<SubscriberService> logger)
{
_logger = logger;
_timer = new Timer(o => TimerCallback(), null, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));
}
private void TimerCallback()
{
Broadcast($"Current time is {DateTime.UtcNow}");
}
public override Task Subscribe(SubscriptionRequest request, IServerStreamWriter<SubscriberEvent> responseStream, ServerCallContext context)
{
_subscriptions.TryAdd(request.ClientId, responseStream);
return responseStream.WriteAsync(new SubscriberEvent() {Id = 0, Message = "Subscribe successful"});
}
public void Broadcast(string message)
{
var count = ++_messageCount;
foreach (var sub in _subscriptions.Values)
{
sub.WriteAsync(new SubscriberEvent() { Id = count, Message = message });
}
_logger.LogInformation($"Broadcast message #{count}: {message}");
}
}
我的客户端只收到初始 'Subscribe Successful' 消息,但不会收到计时器触发的消息。调用 WriteAsync 时我没有得到任何异常。
我是在尝试将 gRPC 用于它从未被设计用来做的事情(SignalR/WebSocket 替代),还是我只是遗漏了一些明显的东西?
对于长运行 gRPC 流,您必须等待客户端说连接已关闭。像这样:
while (!context.CancellationToken.IsCancellationRequested)
{
// event-based action
responseStream.WriteAsync(new SubscriberEvent() {Id = 0, Message = "Subscribe successful"});
}
我认为之前的回答是忙等待。所以我想向您展示它的异步版本。
public override Task Subscribe(RequestMessage, IServerStreamWriter<ReplyMessage> responseStream, ServerCallContext context)
{
// your event-based code here
var tcs = new TaskCompletionSource();
context.CancellationToken.Register(() => tcs.TrySetCanceled(), false);
return tcs.Task;
}
顺便说一句,我想我一直在做一个和你一样的项目。我使用了 Rx.Net 中的 Observables、Subjects 和 ReplaySubjects。这些对于基于事件的代码非常有帮助。
我正在尝试使用 gRPC 进行长期流式传输会话,因为我需要保证从服务器到客户端的消息顺序。
我有以下 .proto:
service Subscriber {
rpc Subscribe(SubscriptionRequest) returns (stream SubscriberEvent);
}
我当前的服务(托管在 ASP.NET / .NET 5.0 中)如下所示:
public class SubscriberService : Subscriber.SubscriberBase
{
private readonly ILogger<SubscriberService> _logger;
private readonly ConcurrentDictionary<string, IServerStreamWriter<SubscriberEvent>> _subscriptions = new();
private int _messageCount = 0;
private Timer _timer;
public SubscriberService(ILogger<SubscriberService> logger)
{
_logger = logger;
_timer = new Timer(o => TimerCallback(), null, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));
}
private void TimerCallback()
{
Broadcast($"Current time is {DateTime.UtcNow}");
}
public override Task Subscribe(SubscriptionRequest request, IServerStreamWriter<SubscriberEvent> responseStream, ServerCallContext context)
{
_subscriptions.TryAdd(request.ClientId, responseStream);
return responseStream.WriteAsync(new SubscriberEvent() {Id = 0, Message = "Subscribe successful"});
}
public void Broadcast(string message)
{
var count = ++_messageCount;
foreach (var sub in _subscriptions.Values)
{
sub.WriteAsync(new SubscriberEvent() { Id = count, Message = message });
}
_logger.LogInformation($"Broadcast message #{count}: {message}");
}
}
我的客户端只收到初始 'Subscribe Successful' 消息,但不会收到计时器触发的消息。调用 WriteAsync 时我没有得到任何异常。
我是在尝试将 gRPC 用于它从未被设计用来做的事情(SignalR/WebSocket 替代),还是我只是遗漏了一些明显的东西?
对于长运行 gRPC 流,您必须等待客户端说连接已关闭。像这样:
while (!context.CancellationToken.IsCancellationRequested)
{
// event-based action
responseStream.WriteAsync(new SubscriberEvent() {Id = 0, Message = "Subscribe successful"});
}
我认为之前的回答是忙等待。所以我想向您展示它的异步版本。
public override Task Subscribe(RequestMessage, IServerStreamWriter<ReplyMessage> responseStream, ServerCallContext context)
{
// your event-based code here
var tcs = new TaskCompletionSource();
context.CancellationToken.Register(() => tcs.TrySetCanceled(), false);
return tcs.Task;
}
顺便说一句,我想我一直在做一个和你一样的项目。我使用了 Rx.Net 中的 Observables、Subjects 和 ReplaySubjects。这些对于基于事件的代码非常有帮助。