Parallel.Invoke 任务繁忙时等待

Parallel.Invoke wait if tasks are busy

有人可以帮我写下面的代码吗?在行中:

Parallel.Invoke(parallelOptions, () => dosomething(message));

我想调用最多 5 个并行任务(如果有 5 个忙,等待下一个可用,然后启动它...如果只有 4 个忙,启动第 5 个,等等)

    private AutoResetEvent autoResetEvent1 = new AutoResetEvent(false);
    private ParallelOptions parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = 5 };

    private void threadProc()
    {
        queue.ReceiveCompleted += MyReceiveCompleted1;
        Debug.WriteLine("about to start loop");
        while (!shutdown)
        {
            queue.BeginReceive();
            autoResetEvent1.WaitOne();
        }
        queue.ReceiveCompleted -= MyReceiveCompleted1;
        queue.Dispose();
        Debug.WriteLine("we are done");
    }

    private void MyReceiveCompleted1(object sender, ReceiveCompletedEventArgs e)
    {
        var message = queue.EndReceive(e.AsyncResult);
        Debug.WriteLine("number of max tasks: " + parallelOptions.MaxDegreeOfParallelism);
        Parallel.Invoke(parallelOptions, () => dosomething(message));
        autoResetEvent1.Set();
    }

    private void dosomething(Message message)
    {
        //dummy body
        var i = 0;
        while (true)
        {
            Thread.Sleep(TimeSpan.FromSeconds(1));
            i++;
            if (i == 5 || i == 10 || i == 15) Debug.WriteLine("loop number: " + i + " on thread: " + Thread.CurrentThread.ManagedThreadId);
            if (i == 15)
                break;
        }
        Debug.WriteLine("finished task");
    }

我现在得到的结果:

1) dosomething() 正如你在上面看到的那样,我一次只得到一个(等待)

2) dosomething() 更改为以下,我得到 none 停止 x 任务数(不受限制或遵守 MaxDegreeOfParallelism

    private async Task dosomething(Message message)
    {
        //dummy body
        var i = 0;
        while (true)
        {
            await Task.Delay(TimeSpan.FromSeconds(1));
            i++;
            if (i == 5 || i == 10 || i == 15) Debug.WriteLine("loop number: " + i + " on thread: " + Thread.CurrentThread.ManagedThreadId);
            if (i == 15)
                break;
        }
        Debug.WriteLine("finished task");
    }

为了实现我想要的目标,我做错了什么?

我想要的结果:

在"MyReceiveCompleted"中,我想确保只有5个并发任务在处理消息,如果有5个忙,等待一个可用。

这行代码:

Parallel.Invoke(parallelOptions, () => dosomething(message));

告诉 TPL 开始新的并行操作,只有一件事要做。所以,5 的 "max parallelism" 选项有点没有意义,因为只有一件事要做。 autoResetEvent1 确保一次只有一个并行操作,并且每个并行操作只有一件事要做,所以观察到的一次只有一件事 运行 的行为是完全可以预期的。

当您将委托更改为异步时,实际发生的是 MaxDegreeOfParallelism 仅应用于方法的 同步 部分。因此,一旦它达到第一个 await,它就会 "leaves" 并行操作,不再受其约束。

核心问题是 Parallel 在操作数量提前已知时效果最佳——您的代码不知道;它只是从队列中读取它们并在它们到达时进行处理。正如有人评论的那样,您 可以 使用动态任务并行解决此问题,并使用 TaskScheduler 限制并发。

但是,最简单的解决方案可能是 TPL Dataflow。您可以使用适当的限制选项创建一个 ActionBlock<T> 并在消息到达时向其发送消息:

private ActionBlock<string> block = new ActionBlock<string>(
    message => dosomething(message),
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });

private void MyReceiveCompleted1(object sender, ReceiveCompletedEventArgs e)
{
  var message = queue.EndReceive(e.AsyncResult);
  block.Post(message);
  autoResetEvent1.Set();
}

TPL 数据流的一个优点是它还理解异步方法,并将 MaxDegreeOfParallelism 解释为最大并发度。