.NET 异步队列处理和一些额外功能

.NET Async Queue Processing with a few extras

我现在有点无助。 我已经阅读了很多关于异步线程的文章,但没有一篇适合我的情况,我也没有完整地了解它。

我有一个带有 table "jobs" 的数据库,它通过应用程序 "Importer" 不断增长。 此 table 在此示例中具有三列(Id、customer_id、DateOfEntry)。 测试数据:

1,A,Date
2,A,Date
3,B,Date
4,C,Date
5,B,Date
...

我有第二个应用程序 "JobWorker" 可以处理该工作。 但有以下限制。

我想在应用程序 "JobWorker" 中启动与客户数量相同的异步操作,在此示例中为 3。 越来越难了。 这个异步工作者应该有自己的队列,它总是可以并行(同步)地处理一个作业。 1) 他们必须从 table 加载最早的作业。 2)处理它 3) 从 table

加载下一个

现在是棘手的部分,实际上有 100 个客户,但他们不是连续发送作业(但我不知道),但我想最多同时处理 10 个作业。 (但不要忘记每个客户只有 1 个)。

我怎样才能做到这一点。我知道拼图(SemaphoreSlim、ActionBlock),但我没有完成拼图;(

[编辑] 我目前的尝试:

public class FakeJob
{
    public int Id { get; set; }

    public string ProjectName { get; set; }

    public int Duration { get; set; }
}


public class JobMaster
{
    private IConfigurationRoot _configuration;

    private BufferBlock<ActionBlock<FakeJob>> _mainQueue;

    private Dictionary<string, ActionBlock<FakeJob>> _projectQueues;

    private Dictionary<Guid, CancellationTokenSource> _projectCancellationTokens;


    public JobMaster()
    {
        _mainQueue = new BufferBlock<ActionBlock<FakeJob>>(new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });
        _projectQueues = new Dictionary<string, ActionBlock<FakeJob>>();
    }


    public async Task WorkOnJobs()
    {
        List<string> projectIds = new List<string>() { "Testkunde 1", "Testkunde 2", "Testkunde 3" };
        List<Task> producerTasks = new List<Task>();
        List<FakeJob> jobs = new List<FakeJob>();


        jobs.Add(new FakeJob { Duration = 10, Id = 1, ProjectName = projectIds[0] });
        jobs.Add(new FakeJob { Duration = 10, Id = 2, ProjectName = projectIds[0] });
        jobs.Add(new FakeJob { Duration = 10, Id = 3, ProjectName = projectIds[0] });
        jobs.Add(new FakeJob { Duration = 4, Id = 4, ProjectName = projectIds[1] });
        jobs.Add(new FakeJob { Duration = 4, Id = 5, ProjectName = projectIds[1] });
        jobs.Add(new FakeJob { Duration = 4, Id = 6, ProjectName = projectIds[1] });
        jobs.Add(new FakeJob { Duration = 2, Id = 7, ProjectName = projectIds[2] });
        jobs.Add(new FakeJob { Duration = 2, Id = 8, ProjectName = projectIds[2] });
        jobs.Add(new FakeJob { Duration = 2, Id = 9, ProjectName = projectIds[2] });


        foreach (var loopProjectId in projectIds)
        {
            producerTasks.Add(WorkOnJobsForForProject(loopProjectId, jobs));
        }


        await Task.WhenAll(producerTasks);
    }


    private async Task WorkOnJobsForForProject(string projectId, List<FakeJob> jobDB)
    {
        var consumerOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 };

        while (true)
        {
            foreach (var loopJob in jobDB.Where(x => x.ProjectName == projectId))
            {
                var consumer = new ActionBlock<FakeJob>(StartJob, consumerOptions);
                _projectQueues[projectId] = consumer;

                await _mainQueue.SendAsync(_projectQueues[projectId]);
                await _projectQueues[projectId].SendAsync(loopJob);
                await Task.WhenAll(_projectQueues[projectId].Completion);
            }

            break;
        }
    }


    private async Task StartJob(FakeJob job)
    {
        Log.Logger.Information("Start job [{A}] for [{B}]", job.Id, job.ProjectName);
        await Task.Delay(job.Duration * 1000);
        _projectQueues[job.ProjectName].Complete();
        Log.Logger.Information("Finished job [{A}]", job.Id);
    }
}

[编辑 2] 我当前的尝试使用 MaxDegreeOfParallelism = 3 for _mainQueue。 但不是 2 ;( 如果我将它设置为 2,则不会执行作业 9 ;(

public class FakeJob
{
    public int Id { get; set; }

    public string ProjectName { get; set; }

    public int Duration { get; set; }

    public bool IsComplete { get; set; }
}


public class JobMaster_BackUp
{
    private ActionBlock<CustomerQueue> _mainQueue;

    private Dictionary<string, ActionBlock<FakeJob>> _projectQueues;

    public static List<FakeJob> FakeJobDB = new List<FakeJob>();


    public JobMaster_BackUp()
    {
        _mainQueue = new ActionBlock<CustomerQueue>(MainQueueJob, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });
        _projectQueues = new Dictionary<string, ActionBlock<FakeJob>>();
    }


    public async Task WorkOnJobs()
    {
        List<string> projectIds = new List<string>() { "Testkunde 1", "Testkunde 2", "Testkunde 3" };
        List<Task> producerTasks = new List<Task>();



        FakeJobDB.Add(new FakeJob { Duration = 3, Id = 1, ProjectName = projectIds[0] });
        FakeJobDB.Add(new FakeJob { Duration = 3, Id = 2, ProjectName = projectIds[0] });
        FakeJobDB.Add(new FakeJob { Duration = 3, Id = 3, ProjectName = projectIds[0] });
        FakeJobDB.Add(new FakeJob { Duration = 3, Id = 4, ProjectName = projectIds[1] });
        FakeJobDB.Add(new FakeJob { Duration = 3, Id = 5, ProjectName = projectIds[1] });
        FakeJobDB.Add(new FakeJob { Duration = 3, Id = 6, ProjectName = projectIds[1] });
        FakeJobDB.Add(new FakeJob { Duration = 3, Id = 7, ProjectName = projectIds[2] });
        FakeJobDB.Add(new FakeJob { Duration = 3, Id = 8, ProjectName = projectIds[2] });
        FakeJobDB.Add(new FakeJob { Duration = 3, Id = 9, ProjectName = projectIds[2] });


        foreach (var loopProjectId in projectIds)
        {
            CancellationTokenHandler.ProjectCancellationTokens[loopProjectId] = new CancellationTokenSource();
            producerTasks.Add(WorkOnJobsForForProject(loopProjectId, CancellationTokenHandler.ProjectCancellationTokens[loopProjectId].Token));
        }


        await Task.WhenAll(producerTasks);
    }


    private FakeJob GetNextJob(string projectId)
    {
        FakeJob nextJob = FakeJobDB.Where(x => x.ProjectName == projectId && x.IsComplete == false).OrderBy(x => x.Id).FirstOrDefault();

        if (nextJob != null)
        {
            Log.Logger.Information("GetNextJob [" + nextJob.Id + "]");
        }

        return nextJob;
    }


    private async Task WorkOnJobsForForProject(string projectId, CancellationToken cancellationToken)
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            FakeJob loopJob = GetNextJob(projectId);

            if (loopJob != null)
            {
                CustomerQueue customerQueue = new CustomerQueue(loopJob);

                await _mainQueue.SendAsync(customerQueue);

                await customerQueue.WaitForCompletion();
            }
        }
    }





    private async Task MainQueueJob(CustomerQueue consumer)
    {
        consumer.Start();
        await Task.WhenAll(consumer.WaitForCompletion());
    }
}


public class CustomerQueue
{
    private ActionBlock<FakeJob> _queue;

    private FakeJob _job;


    public CustomerQueue(FakeJob job)
    {
        _job = job;

        var consumerOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 };
        _queue = new ActionBlock<FakeJob>(StartJob, consumerOptions);
    }


    public void Start()
    {
        _queue.SendAsync(_job);
    }


    public async Task WaitForCompletion()
    {
        await Task.WhenAll(_queue.Completion);
    }


    private async Task StartJob(FakeJob job)
    {
        //Log.Logger.Information("Start job [{A}] for [{B}]", job.Id, job.ProjectName);
        await Task.Delay(job.Duration * 1000);

        JobMaster_BackUp.FakeJobDB.Single(x => x.Id == job.Id).IsComplete = true;

        _queue.Complete();

        Log.Logger.Information("Finished job [{A}]", job.Id);
    }
}

This does one thing right. It just executes one job per customer in parallel. But the max 2 Jobs in total does not work.

我不清楚你想要完成什么,但我认为你缺少的是不同数据流块的协调。

有几种方法可以实现这一点。一种是使用 SemaphoreSlim,正如您已经注意到的那样。您将创建一个最大计数为 10 的单个 SemaphoreSlim,将其传递给 CustomerQueue 构造函数,并让您的 StartJob 在开头执行 await WaitAsync 并在 Release 最后。

另一种方法是为操作块提供一个调度程序 - 具体来说,ConcurrentExclusiveSchedulerPairConcurrent 一半,并发设置为 10。您可以将其传递到 ConsumerQueue 构造函数并将其设置在块选项上。