在运行时添加和删除 Parallel.Invoke 操作

Add and remove Parallel.Invoke actions during runtime

我正在使用 Parallel.Invoke 同时 运行 某些方法,并在所有方法完成后收集结果。

问题

正如您在“可怕的代码”部分看到的那样,操作列表被硬编码为三个元素,如果 detectedDevicesList.Count != 3.

将完全无用

尝试过的解决方案

我试图动态创建一个 Actions[] 数组并将其作为参数传递给 Parallel.Invoke,但我无法将现有方法转换为 Task,然后进入 Actions.

非工作代码

public async Task<Task<String>> callWithEveryConnectedDevice(ListBox listBoxLog, Boolean sendAlarms)
{        
    String TEST_CALLS_COMPLETED = "All test calls completed.";
    String TEST_CALLS_FAILED = "One or more test cals failed";

    return Task.Run(async () =>
    {
        List<MobileEquipment> detectedDevicesList = await GetConnectedDevices.getAsync();

        if (detectedDevicesList.Count == 0)
        {
            UpdateGui.listboxAddItem(listBoxLog, "No devices are connected.", true);
            return TEST_CALLS_FAILED;
        }

        Console.WriteLine("Executing test calls...");

        List<Task<MobileEquipment>> results = new List<Task<MobileEquipment>>();

        //Horrible code begins...
        Parallel.Invoke(() =>
        {
            results.Add(new MakePhoneCall().call(detectedDevicesList[0], listBoxLog));
        },
        () =>
        {
            results.Add(new MakePhoneCall().call(detectedDevicesList[1], listBoxLog));
        },
        () =>
        {
            results.Add(new MakePhoneCall().call(detectedDevicesList[2], listBoxLog));
        });

        //Horrible code ends...
        foreach (Task<MobileEquipment> mobileEquipment in results)
        {
            UpdateGui.listboxAddItem(listBoxLog, "Test call result for " + mobileEquipment.Result.serial + " " + mobileEquipment.Result.operador + ": " + mobileEquipment.Result.callSuccess, true);

            if (!mobileEquipment.Result.callSuccess && sendAlarms)
            {                      
                await SendEmail.sendAlarmEmailsAsync(libreta, asunto, mensaje);
            }
        }
                      

        UpdateGui.listboxAddItem(listBoxLog, TEST_CALLS_COMPLETED, true);

        return TEST_CALLS_COMPLETED;
    });
}

编辑:READER 的有用信息和经验教训

根据收到的出色回答和评论,我添加了一些最初缺失的代码,它们可以帮助您安全地与来自并行任务的 Windows 表单对象进行交互。

public static void ListboxAddItem(ListBox listBox, String argText, Boolean useTimestamp)
    {
        String timeStamp = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss");

        if (useTimestamp)
        {
            argText = timeStamp + ": " + argText;
        }

        if (Thread.CurrentThread.IsBackground)
        {
            listBox.Invoke(new Action(() =>
            {
                listBox.Items.Add(argText);
                listBox.SelectedIndex = listBox.Items.Count - 1;
            }));
        }
        else
        {
            listBox.Items.Add(argText);
            listBox.SelectedIndex = listBox.Items.Count - 1;
        }
    }

此外,不要盲目遵循 IntelliSense 建议,以防止像 Task 这样的恶作剧,或在 C# 上使用类似 Java 的大小写。

很难选出最佳答案,因为它们都可以正常工作并且没有任何明显的性能差异(MakePhoneCall().call 自动 phone 调用连接了 Android 设备通过亚行)。检查哪个答案最适合您的特定应用。

Parallel.For or Parallel.Foreach and a concurrent collection。应该更合适:

ConcurrentStack<Task<MobileEquipment>> results = new ();
Parallel.Foreach(detectedDevicesList, d => results.Add(new MakePhoneCall().call(d, listBoxLog));

另一种选择是 parallel Linq

var result = detectedDevicesList.AsParallel(
    d => results.Add(new MakePhoneCall().call(d, listBoxLog).ToList();

不过,Call returns看起来像是一个任务,所以你确定这是一个慢阻塞调用?如果不是,最好使用常规循环来启动调用,并使用 Task.WaitAll 来 (a) 等待它们完成。看起来您当前的解决方案可能会在 mobileEquipment.Result.

上阻塞

还要注意 listBoxLog 看起来像一个 UI 对象,并且不允许从工作线程访问 UI 对象。如果方法是 'Pure' 并且对象是不可变的,那么使用后台线程进行处理会容易得多。 IE。避免可能不是线程安全的副作用。作为一般规则,我建议避免多线程编程,除非 a) 有充分的理由期待一些改进,b) 你很清楚 thread safety.

的危险

您也可以考虑使用 Dataflow 设置一个管道,以并行和异步的方式执行每个处理步骤。

您应该使用 Microsoft 的 Reactive Framework(又名 Rx)- NuGet System.Reactive 并添加 using System.Reactive.Linq; - 然后您所有丑陋的代码都会变成这样:

IObservable<MobileEquipment> query =
    from detectedDevicesList in Observable.FromAsync(() => GetConnectedDevices.getAsync())
    from detectedDevice in detectedDevicesList.ToObservable()
    from mobileEquipment in Observable.FromAsync(() => new MakePhoneCall().call(detectedDevice, listBoxLog))
    select mobileEquipment;

完整方法现在正确 returns Task<String>,而不是 Task<Task<String>>

这里是:

public async Task<String> callWithEveryConnectedDevice(ListBox listBoxLog, Boolean sendAlarms)
{
    String TEST_CALLS_COMPLETED = "All test calls completed.";
    String TEST_CALLS_FAILED = "One or more test cals failed";

    IObservable<MobileEquipment> query =
        from detectedDevicesList in Observable.FromAsync(() => GetConnectedDevices.getAsync())
        from detectedDevice in detectedDevicesList.ToObservable()
        from mobileEquipment in Observable.FromAsync(() => new MakePhoneCall().call(detectedDevice, listBoxLog))
        select mobileEquipment;
        
    IList<MobileEquipment> results = await query.ToList();

    if (results.Count == 0)
    {
        UpdateGui.listboxAddItem(listBoxLog, "No devices are connected.", true);
        return TEST_CALLS_FAILED;
    }

    foreach (MobileEquipment mobileEquipment in results)
    {
        UpdateGui.listboxAddItem(listBoxLog, "Test call result for " + mobileEquipment.serial + " " + mobileEquipment.operador + ": " + mobileEquipment.callSuccess, true);

        if (!mobileEquipment.callSuccess && sendAlarms)
        {
            await SendEmail.sendAlarmEmailsAsync(libreta, asunto, mensaje);
        }
    }

    UpdateGui.listboxAddItem(listBoxLog, TEST_CALLS_COMPLETED, true);

    return TEST_CALLS_COMPLETED;
}

Parallel.Invoke is not the correct tool to use in this case because your workload is asynchronous, and the Parallel.Invoke is not async-friendly. Your problem can be solved by just creating all the CallAsync tasks at once, and then await all of them to complete with the Task.WhenAll方法。在 await 之后,您回到了 UI 线程,您可以使用结果安全地更新 UI。

将检测到的设备投影到任务的便捷工具是 Select LINQ 运算符。

public static async Task CallWithEveryConnectedDevice(ListBox listBoxLog, Boolean sendAlarms)
{
    List<MobileEquipment> detectedDevicesList = await GetConnectedDevices.GetAsync();

    Task<MobileEquipment>[] tasks = detectedDevicesList
        .Select(device => new MakePhoneCall().CallAsync(device))
        .ToArray();

    MobileEquipment[] results = await Task.WhenAll(tasks);

    foreach (var mobileEquipment in results)
    {
        UpdateGui.ListboxAddItem(listBoxLog,
            $"Test call result for {mobileEquipment.Serial} {mobileEquipment.Operador}: {mobileEquipment.CallSuccess}", true);
    }

    foreach (var mobileEquipment in results)
    {
        if (!mobileEquipment.CallSuccess && sendAlarms)
        {
            await SendEmail.SendAlarmEmailsAsync(libreta, asunto, mensaje);
        }
    }
}