如何使用 signalr 以正确的方式处理更新?

How to handle updates in the right way using signalr?

我有一个客户端应用程序 Angular 和一个 signalR 集线器,还有一个将时间戳作为参数的服务。

我想在客户端按下开始按钮时调用集线器中的方法,当调用该方法时我想继续列出所有更改(创建计时器)直到客户端按下停止按钮然后我将停止计时器。

所以想问一下哪个更好:

1- 使用时间戳从客户端调用调用的方法,然后创建一个 setInterval 来调用其中的方法,当按下停止按钮时我可以停止它。

优点: 启动和停止计时器很容易。

缺点: 我每 1 秒调用一次该方法,然后检查客户端是否有更新 UI.

的响应

2- 调用该方法一次,然后为服务器上的每个客户端创建一个计时器,当客户端按下停止按钮时,我可以调用另一个方法来停止该客户端的计时器。

优点: 我正在检查集线器中的时间戳,只有当来自服务的时间戳 > 本地时间戳

时,我才会将数据发送到客户端

缺点: 我实际上不知道如何为每个客户端创建一个计时器,所以如果这是正确的方法请帮助我

选项 #1 不可用,因为 SignalR 的存在消除了轮询的需要。频繁的轮询也不会扩展。如果每个客户端每 1 秒轮询一次服务器,网站最终将白白支付 CPU 的 lot 和带宽。商务人士也不喜欢频繁轮询,因为所有托管商和云提供商都对出口收费。

SignalR streaming examples 使用定时通知作为使用 IAsyncEnumerable<T> 的流式通知的简单示例。在最简单的示例中,计数器每 delay 毫秒递增一次:

public class AsyncEnumerableHub : Hub
{
    public async IAsyncEnumerable<int> Counter(
        int count,
        int delay,
        [EnumeratorCancellation]
        CancellationToken cancellationToken)
    {
        for (var i = 0; i < count; i++)
        {
            // Check the cancellation token regularly so that the server will stop
            // producing items if the client disconnects.
            cancellationToken.ThrowIfCancellationRequested();

            yield return i;

            // Use the cancellationToken in other APIs that accept cancellation
            // tokens so the cancellation can flow down to them.
            await Task.Delay(delay, cancellationToken);
        }
    }
}

客户端可以通过所需的延迟调用此操作并开始接收通知。 SignalR 知道这是一个通知流,因为它 returns IAsyncEnumerable.

下一个更高级的示例使用 Channels 允许发布者方法 WriteItemsAsync 向中心发送通知流。

动作本身更简单,它只是 returns 频道的 reader:

public ChannelReader<int> Counter(
    int count,
    int delay,
    CancellationToken cancellationToken)
{
    var channel = Channel.CreateUnbounded<int>();

    // We don't want to await WriteItemsAsync, otherwise we'd end up waiting 
    // for all the items to be written before returning the channel back to
    // the client.
    _ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);

    return channel.Reader;
}

发布者方法写入 ChannelWriter 而不是返回 IAsyncEnumerable :

private async Task WriteItemsAsync(
    ChannelWriter<int> writer,
    int count,
    int delay,
    CancellationToken cancellationToken)
{
    Exception localException = null;
    try
    {
        for (var i = 0; i < count; i++)
        {
            await writer.WriteAsync(i, cancellationToken);

            // Use the cancellationToken in other APIs that accept cancellation
            // tokens so the cancellation can flow down to them.
            await Task.Delay(delay, cancellationToken);
        }
    }
    catch (Exception ex)
    {
        localException = ex;
    }

    writer.Complete(localException);
}

这个方法很容易在不同的class。只需将 ChannelWriter 传递给发布者即可。

您正在使用 SignalR 进行实时数据通信。每秒调用一个方法只是在SignalR脸上开玩笑...所以这不是解决方案

最好的解决方案是使用群组功能。

示例:

  1. 您的开始按钮会将用户添加到组中。
  2. 当您的用户在群组中时,它将收到您需要的所有数据。 await this.Clients.Group("someGroup").BroadcastMessage(message);
  3. 您的停止按钮会将用户从组中删除,因此它将不再接收数据。

集线器上的一些代码示例:

public async Task Start()
{
    // Add user to the data group

    await this.Groups.AddToGroupAsync(this.Context.ConnectionId, "dataGroup");
}

public async Task Stop()
{
    // Add user to the data group

    await this.Groups.RemoveFromGroupAsync(this.Context.ConnectionId, "dataGroup");
}

向按下开始的用户发送数据并接收实时数据的 Worker 示例。

private readonly IHubContext<SignalRHub, ISignalRHub> hub;

private readonly IServiceProvider serviceProvider;

public Worker(IServiceProvider serviceProvider, IHubContext<SignalRHub, ISignalRHub> hub)
{
    this.serviceProvider = serviceProvider;
    this.hub = hub;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    while (!stoppingToken.IsCancellationRequested)
    {
        await this.hub.Clients.Group("dataGroup").BroadcastMessage(DataManager.GetData());

        this.Logger.LogDebug("Sent data to all users at {0}", DateTime.UtcNow);

        await Task.Delay(1000, stoppingToken);
    }
}

PS:如果你有工人,我假设你有一些经理来获取数据或发送给用户的东西。

编辑: 如果你不想使用 worker,你总是可以只使用定时器:

public class TimerManager
{
    private Timer _timer;
    private AutoResetEvent _autoResetEvent;
    private Action _action;
    public DateTime TimerStarted { get; }

    public TimerManager(Action action)
    {
        _action = action;
        _autoResetEvent = new AutoResetEvent(false);
        _timer = new Timer(Execute, _autoResetEvent, 1000, 2000);
        TimerStarted = DateTime.Now;
    }

    public void Execute(object stateInfo)
    {
        _action();
        if((DateTime.Now - TimerStarted).Seconds > 60)
        {
            _timer.Dispose();
        }
    }
}

然后在类似的地方使用它:

 var timerManager = new TimerManager(() => this.hub.Clients.Group("dataGroup").BroadcastMessage(DataManager.GetData()));