gRPC 保持流活动

gRPC Keep stream alive

我正在尝试使用 gRPC 进行长期流式传输会话,因为我需要保证从服务器到客户端的消息顺序。

我有以下 .proto:

service Subscriber {
    rpc Subscribe(SubscriptionRequest) returns (stream SubscriberEvent);
}

我当前的服务(托管在 ASP.NET / .NET 5.0 中)如下所示:

public class SubscriberService : Subscriber.SubscriberBase
{
    private readonly ILogger<SubscriberService> _logger;
    private readonly ConcurrentDictionary<string, IServerStreamWriter<SubscriberEvent>> _subscriptions = new();
    private int _messageCount = 0;
    private Timer _timer;

    public SubscriberService(ILogger<SubscriberService> logger)
    {
        _logger = logger;
        _timer = new Timer(o => TimerCallback(), null, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));
    }

    private void TimerCallback()
    {
        Broadcast($"Current time is {DateTime.UtcNow}");
    }

    public override Task Subscribe(SubscriptionRequest request, IServerStreamWriter<SubscriberEvent> responseStream, ServerCallContext context)
    {
        _subscriptions.TryAdd(request.ClientId, responseStream);
        return responseStream.WriteAsync(new SubscriberEvent() {Id = 0, Message = "Subscribe successful"});
    }

    public void Broadcast(string message)
    {
        var count = ++_messageCount;
        foreach (var sub in _subscriptions.Values)
        {
            sub.WriteAsync(new SubscriberEvent() { Id = count, Message = message });
        }
        _logger.LogInformation($"Broadcast message #{count}: {message}");
    }
}

我的客户端只收到初始 'Subscribe Successful' 消息,但不会收到计时器触发的消息。调用 WriteAsync 时我没有得到任何异常。

我是在尝试将 gRPC 用于它从未被设计用来做的事情(SignalR/WebSocket 替代),还是我只是遗漏了一些明显的东西?

对于长运行 gRPC 流,您必须等待客户端说连接已关闭。像这样:

while (!context.CancellationToken.IsCancellationRequested)
{
    // event-based action
    responseStream.WriteAsync(new SubscriberEvent() {Id = 0, Message = "Subscribe successful"});  
}

我认为之前的回答是忙等待。所以我想向您展示它的异步版本。

public override Task Subscribe(RequestMessage, IServerStreamWriter<ReplyMessage> responseStream, ServerCallContext context)
 {
    // your event-based code here
    
    var tcs = new TaskCompletionSource();
    context.CancellationToken.Register(() => tcs.TrySetCanceled(), false);
    return tcs.Task;
 }

顺便说一句,我想我一直在做一个和你一样的项目。我使用了 Rx.Net 中的 Observables、Subjects 和 ReplaySubjects。这些对于基于事件的代码非常有帮助。