C# Parallel.Foreach...未知线程数的多线程

C# Parallel.Foreach...Multi theading with unknown number of threads

我的每个企业都有一个必须 运行 的同步过程。商家数量千变万化。

我已经阅读了有关线程 class、并行性等的文档...我不确定我是否理解没有 knowning/naming 预定义线程数的情况下如何做到这一点.. .在这种情况下,该数字是未知的。出于这个原因,我发现 Parallel.ForEach...因为我希望 运行 未知数量的同时操作

我的同步操作 运行 每 10 分钟一次。他们每个人最多需要一两分钟才能 运行。显然,我不能 运行 他们迭代,因为当他们完成时,下一个调用将被触发。

我想 运行 在单独的线程中同时处理它们。虽然他们每个人都应该有唯一的 API 键,但他们不共享内存或数据,也不会修改任何共享数据。

为此,我研究了如何进行多线程...我认为 Parallel.ForEach 可以解决问题...

我需要语法方面的帮助...

这是在 Worker Service 中...我有一个名为 SyncBusiness(int businessId) 的私有方法,它调用同步业务的 API 端点。简单..只需要调用方法的帮助?

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    var businessIds = (from x in _db.Poslookup
                       select x.BusinessId).Distinct();

    while (!stoppingToken.IsCancellationRequested)
    {
        // Want to multi-thread a sync for each of the businesses in businessIds
        Parallel.ForEach(businessIds, i => { 
            await SyncBusiness(i)
        });

        _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
        await Task.Delay(600000, stoppingToken);
    }
}

此外,请评论关于可伸缩性、线程限制等的任何陷阱……如果我发展到数千家企业要同步,我可能会遇到麻烦的任何领域……也许是关于事情的建议了解同步操作和可伸缩性?

非常感谢。干杯。

来自官方文档:https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/how-to-write-a-simple-parallel-foreach-loop

The loop partitions the source collection and schedules the work on multiple threads based on the system environment. The more processors on the system, the faster the parallel method runs. For some source collections, a sequential loop may be faster, depending on the size of the source and the kind of work the loop performs.

你不能同时运行他们。并行度总是受 cpus 和(超线程也有帮助)

陷阱

这里有另一个很棒的指南,解释了很多关于并行编程的陷阱:https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/potential-pitfalls-in-data-and-task-parallelism

重点是避免使用非线程安全代码,并行并不总是更快(视情况而定),e.t.c

请注意,您可能无法满足您的要求。如果线程数以千计,并且处理在 10 分钟内未完成,您的下一批将不会启动。您需要扩展到多台机器。

类似于:

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
      IEnumerable<string> businessIds = (from x in _db.Poslookup
                               select x.BusinessId).Distinct();

     // Want to multi-thread a sync for each of the businesses in businessIds
     Parallel.ForEach(businessIds, async i =>
     {
         await SyncBusiness(i, stoppingToken);
     });


    _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
}

private async Task SyncBusiness(string businessId, CancellationToken stoppingToken)
{
    await new HttpClient().GetAsync($"https://example.com/endpoint/{businessId}", stoppingToken);
}

根据 Peter Bons 评论进行编辑。 替换

Parallel.ForEach(businessIds, async i =>
         {
             await SyncBusiness(i, stoppingToken);
         });

// Want to multi-thread a sync for each of the businesses in businessIds
            IEnumerable<Task> tasks = businessIds.Select(i => SyncBusiness(i, stoppingToken));

            Task.WaitAll(tasks.ToArray());

正如其他人所指出的,您不能将 asyncParallel.ForEach 一起使用。但是,您可以通过一次启动所有 SyncBusiness 调用然后使用 Task.WhenAll:

来使异步代码并发
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
  var businessIds = (from x in _db.Poslookup
                     select x.BusinessId).Distinct();

  while (!stoppingToken.IsCancellationRequested)
  {
    var tasks = businessIds.Select(SyncBusiness).ToList();
    await Task.WhenAll(tasks);

    _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
    await Task.Delay(600000, stoppingToken);
  }
}

我还建议让您的数据库查找异步:

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
  while (!stoppingToken.IsCancellationRequested)
  {
    var businessIds = await (from x in _db.Poslookup
                       select x.BusinessId).Distinct().ToListAsync();

    var tasks = businessIds.Select(SyncBusiness).ToList();
    await Task.WhenAll(tasks);

    _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
    await Task.Delay(600000, stoppingToken);
  }
}

最后的观察是,此代码当前同步所有业务,然后等待十分钟它的工作之间。如果你想让它每10分钟启动一次运行,那么你可以在方法的开头启动定时器:

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
  while (!stoppingToken.IsCancellationRequested)
  {
    _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
    var timerTask = Task.Delay(TimeSpan.FromMinutes(10), stoppingToken);
    var businessIds = await (from x in _db.Poslookup
                       select x.BusinessId).Distinct().ToListAsync();

    var tasks = businessIds.Select(SyncBusiness).ToList();
    tasks.Add(timerTask);
    await Task.WhenAll(tasks);
  }
}