List.Add 未完美添加并行和异步

List.Add not added perfectly with Parallel and async

我有一个 Blazor WASM 应用程序,它需要在不阻塞 UI 的情况下每秒调用一个 API。此代码演示了我是如何尝试做到这一点的:

List<int> testList = new();
testList.Add(1);
testList.Add(2);
testList.Add(3);
testList.Add(4);

List<int> emptyTestlist = new();

CancellationTokenSource cts;

Test();

void Test()
{
    Parallel.Invoke(async () =>
    {
        do
        {
            Console.WriteLine("Start");
            await Task.Delay(1000);
            await Test2();
            Console.WriteLine("END");
        } while (true);
    });
}
Console.ReadLine();

async ValueTask Test2()
{
    emptyTestlist.Clear();
    cts = new();
    await Parallel.ForEachAsync(testList, cts.Token, async (test, token) =>
    {
        await Test4(test);
    });
    foreach (var test in emptyTestlist)
    {
        await Test3(test);
    }
}

async Task Test4(int i)
{
    await Task.Delay(300);
    //Console.WriteLine("if I Add this console.WriteLine It's added perfectly");
    emptyTestlist.Add(i);
    Console.WriteLine($"from TEST4: {i}");
}

async Task Test3(int i)
{
    Console.WriteLine($"TEST3 {i}.");
    await Task.Delay(1000);
    Console.WriteLine($"TEST3 {i}, after 1sec");
}

如果我对行 Console.WriteLine("if I Add this console.WriteLine It's added perfectly"); 进行注释,它的添加并不完美。 (emptyTestlist.Count 并不总是 4)。但是如果我在 emptyTestlist.Add(i) 之前添加 Console.WriteLine 它可以正常工作(emptyTestlist.Count 总是 4)。

不知道怎么解决。有什么问题吗?

轮询 API 的最简单方法是使用计时器:

@code {
    private List<Customer> custs=new List<Customer>();
    
    private System.Threading.Timer timer;

    protected override async Task OnInitializedAsync()
    {
        await base.OnInitializedAsync();
        custs = await Http.GetFromJsonAsync<List<Customer>>(url);

        timer = new System.Threading.Timer(async _ =>
        {
            custs = await Http.GetFromJsonAsync<List<Customer>>("/api/customers");
            InvokeAsync(StateHasChanged); 
        }, null, 1000, 1000);
    }

在这种情况下,需要 InvokeAsync(StateHasChanged);,因为状态是从计时器线程修改的,而 Blazor 不知道数据已更改。

如果我们想将结果添加到列表中,我们要么必须使用锁,要么必须使用线程安全的集合,例如 ConcurrentQueue.

@code {
    private ConcurrentQueue<Customer> custs=new ConcurrentQueue<Customer>();
    
    private System.Threading.Timer timer;

    protected override async Task OnInitializedAsync()
    {
        await base.OnInitializedAsync();
        custs = await Http.GetFromJsonAsync<List<Customer>>(url);

        timer = new System.Threading.Timer(async _ =>
        {
            var results = await Http.GetFromJsonAsync<List<Customer>>("/api/customers");
            foreach(var c in results)
            {
                custs.Enqueue(c);
            }
            InvokeAsync(StateHasChanged); 
        }, null, 1000, 1000);
    }

每秒轮询一次 API 以防万一有任何新数据不是很有效。最好让 API 使用 SignalR or Push Notifications

通知客户任何新数据

documentation example 借用这足以从服务器接收消息:

@code {
    private HubConnection hubConnection;
    private List<string> messages = new List<string>();
    private string userInput;
    private string messageInput;

    protected override async Task OnInitializedAsync()
    {
        hubConnection = new HubConnectionBuilder()
            .WithUrl(NavigationManager.ToAbsoluteUri("/chathub"))
            .Build();

        hubConnection.On<string, string>("ReceiveMessage", (user, message) =>
        {
            var encodedMsg = $"{user}: {message}";
            messages.Add(encodedMsg);
            StateHasChanged();
        });

        await hubConnection.StartAsync();
    }