.NET 异步队列处理和一些额外功能
.NET Async Queue Processing with a few extras
我现在有点无助。
我已经阅读了很多关于异步线程的文章,但没有一篇适合我的情况,我也没有完整地了解它。
我有一个带有 table "jobs" 的数据库,它通过应用程序 "Importer" 不断增长。
此 table 在此示例中具有三列(Id、customer_id、DateOfEntry)。
测试数据:
1,A,Date
2,A,Date
3,B,Date
4,C,Date
5,B,Date
...
我有第二个应用程序 "JobWorker" 可以处理该工作。
但有以下限制。
我想在应用程序 "JobWorker" 中启动与客户数量相同的异步操作,在此示例中为 3。
越来越难了。
这个异步工作者应该有自己的队列,它总是可以并行(同步)地处理一个作业。
1) 他们必须从 table 加载最早的作业。
2)处理它
3) 从 table
加载下一个
现在是棘手的部分,实际上有 100 个客户,但他们不是连续发送作业(但我不知道),但我想最多同时处理 10 个作业。 (但不要忘记每个客户只有 1 个)。
我怎样才能做到这一点。我知道拼图(SemaphoreSlim、ActionBlock),但我没有完成拼图;(
[编辑]
我目前的尝试:
public class FakeJob
{
public int Id { get; set; }
public string ProjectName { get; set; }
public int Duration { get; set; }
}
public class JobMaster
{
private IConfigurationRoot _configuration;
private BufferBlock<ActionBlock<FakeJob>> _mainQueue;
private Dictionary<string, ActionBlock<FakeJob>> _projectQueues;
private Dictionary<Guid, CancellationTokenSource> _projectCancellationTokens;
public JobMaster()
{
_mainQueue = new BufferBlock<ActionBlock<FakeJob>>(new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });
_projectQueues = new Dictionary<string, ActionBlock<FakeJob>>();
}
public async Task WorkOnJobs()
{
List<string> projectIds = new List<string>() { "Testkunde 1", "Testkunde 2", "Testkunde 3" };
List<Task> producerTasks = new List<Task>();
List<FakeJob> jobs = new List<FakeJob>();
jobs.Add(new FakeJob { Duration = 10, Id = 1, ProjectName = projectIds[0] });
jobs.Add(new FakeJob { Duration = 10, Id = 2, ProjectName = projectIds[0] });
jobs.Add(new FakeJob { Duration = 10, Id = 3, ProjectName = projectIds[0] });
jobs.Add(new FakeJob { Duration = 4, Id = 4, ProjectName = projectIds[1] });
jobs.Add(new FakeJob { Duration = 4, Id = 5, ProjectName = projectIds[1] });
jobs.Add(new FakeJob { Duration = 4, Id = 6, ProjectName = projectIds[1] });
jobs.Add(new FakeJob { Duration = 2, Id = 7, ProjectName = projectIds[2] });
jobs.Add(new FakeJob { Duration = 2, Id = 8, ProjectName = projectIds[2] });
jobs.Add(new FakeJob { Duration = 2, Id = 9, ProjectName = projectIds[2] });
foreach (var loopProjectId in projectIds)
{
producerTasks.Add(WorkOnJobsForForProject(loopProjectId, jobs));
}
await Task.WhenAll(producerTasks);
}
private async Task WorkOnJobsForForProject(string projectId, List<FakeJob> jobDB)
{
var consumerOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 };
while (true)
{
foreach (var loopJob in jobDB.Where(x => x.ProjectName == projectId))
{
var consumer = new ActionBlock<FakeJob>(StartJob, consumerOptions);
_projectQueues[projectId] = consumer;
await _mainQueue.SendAsync(_projectQueues[projectId]);
await _projectQueues[projectId].SendAsync(loopJob);
await Task.WhenAll(_projectQueues[projectId].Completion);
}
break;
}
}
private async Task StartJob(FakeJob job)
{
Log.Logger.Information("Start job [{A}] for [{B}]", job.Id, job.ProjectName);
await Task.Delay(job.Duration * 1000);
_projectQueues[job.ProjectName].Complete();
Log.Logger.Information("Finished job [{A}]", job.Id);
}
}
[编辑 2]
我当前的尝试使用 MaxDegreeOfParallelism = 3 for _mainQueue。
但不是 2 ;(
如果我将它设置为 2,则不会执行作业 9 ;(
public class FakeJob
{
public int Id { get; set; }
public string ProjectName { get; set; }
public int Duration { get; set; }
public bool IsComplete { get; set; }
}
public class JobMaster_BackUp
{
private ActionBlock<CustomerQueue> _mainQueue;
private Dictionary<string, ActionBlock<FakeJob>> _projectQueues;
public static List<FakeJob> FakeJobDB = new List<FakeJob>();
public JobMaster_BackUp()
{
_mainQueue = new ActionBlock<CustomerQueue>(MainQueueJob, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });
_projectQueues = new Dictionary<string, ActionBlock<FakeJob>>();
}
public async Task WorkOnJobs()
{
List<string> projectIds = new List<string>() { "Testkunde 1", "Testkunde 2", "Testkunde 3" };
List<Task> producerTasks = new List<Task>();
FakeJobDB.Add(new FakeJob { Duration = 3, Id = 1, ProjectName = projectIds[0] });
FakeJobDB.Add(new FakeJob { Duration = 3, Id = 2, ProjectName = projectIds[0] });
FakeJobDB.Add(new FakeJob { Duration = 3, Id = 3, ProjectName = projectIds[0] });
FakeJobDB.Add(new FakeJob { Duration = 3, Id = 4, ProjectName = projectIds[1] });
FakeJobDB.Add(new FakeJob { Duration = 3, Id = 5, ProjectName = projectIds[1] });
FakeJobDB.Add(new FakeJob { Duration = 3, Id = 6, ProjectName = projectIds[1] });
FakeJobDB.Add(new FakeJob { Duration = 3, Id = 7, ProjectName = projectIds[2] });
FakeJobDB.Add(new FakeJob { Duration = 3, Id = 8, ProjectName = projectIds[2] });
FakeJobDB.Add(new FakeJob { Duration = 3, Id = 9, ProjectName = projectIds[2] });
foreach (var loopProjectId in projectIds)
{
CancellationTokenHandler.ProjectCancellationTokens[loopProjectId] = new CancellationTokenSource();
producerTasks.Add(WorkOnJobsForForProject(loopProjectId, CancellationTokenHandler.ProjectCancellationTokens[loopProjectId].Token));
}
await Task.WhenAll(producerTasks);
}
private FakeJob GetNextJob(string projectId)
{
FakeJob nextJob = FakeJobDB.Where(x => x.ProjectName == projectId && x.IsComplete == false).OrderBy(x => x.Id).FirstOrDefault();
if (nextJob != null)
{
Log.Logger.Information("GetNextJob [" + nextJob.Id + "]");
}
return nextJob;
}
private async Task WorkOnJobsForForProject(string projectId, CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
FakeJob loopJob = GetNextJob(projectId);
if (loopJob != null)
{
CustomerQueue customerQueue = new CustomerQueue(loopJob);
await _mainQueue.SendAsync(customerQueue);
await customerQueue.WaitForCompletion();
}
}
}
private async Task MainQueueJob(CustomerQueue consumer)
{
consumer.Start();
await Task.WhenAll(consumer.WaitForCompletion());
}
}
public class CustomerQueue
{
private ActionBlock<FakeJob> _queue;
private FakeJob _job;
public CustomerQueue(FakeJob job)
{
_job = job;
var consumerOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 };
_queue = new ActionBlock<FakeJob>(StartJob, consumerOptions);
}
public void Start()
{
_queue.SendAsync(_job);
}
public async Task WaitForCompletion()
{
await Task.WhenAll(_queue.Completion);
}
private async Task StartJob(FakeJob job)
{
//Log.Logger.Information("Start job [{A}] for [{B}]", job.Id, job.ProjectName);
await Task.Delay(job.Duration * 1000);
JobMaster_BackUp.FakeJobDB.Single(x => x.Id == job.Id).IsComplete = true;
_queue.Complete();
Log.Logger.Information("Finished job [{A}]", job.Id);
}
}
This does one thing right. It just executes one job per customer in parallel. But the max 2 Jobs in total does not work.
我不清楚你想要完成什么,但我认为你缺少的是不同数据流块的协调。
有几种方法可以实现这一点。一种是使用 SemaphoreSlim
,正如您已经注意到的那样。您将创建一个最大计数为 10 的单个 SemaphoreSlim
,将其传递给 CustomerQueue
构造函数,并让您的 StartJob
在开头执行 await WaitAsync
并在 Release
最后。
另一种方法是为操作块提供一个调度程序 - 具体来说,ConcurrentExclusiveSchedulerPair
的 Concurrent
一半,并发设置为 10。您可以将其传递到 ConsumerQueue
构造函数并将其设置在块选项上。
我现在有点无助。 我已经阅读了很多关于异步线程的文章,但没有一篇适合我的情况,我也没有完整地了解它。
我有一个带有 table "jobs" 的数据库,它通过应用程序 "Importer" 不断增长。 此 table 在此示例中具有三列(Id、customer_id、DateOfEntry)。 测试数据:
1,A,Date
2,A,Date
3,B,Date
4,C,Date
5,B,Date
...
我有第二个应用程序 "JobWorker" 可以处理该工作。 但有以下限制。
我想在应用程序 "JobWorker" 中启动与客户数量相同的异步操作,在此示例中为 3。 越来越难了。 这个异步工作者应该有自己的队列,它总是可以并行(同步)地处理一个作业。 1) 他们必须从 table 加载最早的作业。 2)处理它 3) 从 table
加载下一个现在是棘手的部分,实际上有 100 个客户,但他们不是连续发送作业(但我不知道),但我想最多同时处理 10 个作业。 (但不要忘记每个客户只有 1 个)。
我怎样才能做到这一点。我知道拼图(SemaphoreSlim、ActionBlock),但我没有完成拼图;(
[编辑] 我目前的尝试:
public class FakeJob
{
public int Id { get; set; }
public string ProjectName { get; set; }
public int Duration { get; set; }
}
public class JobMaster
{
private IConfigurationRoot _configuration;
private BufferBlock<ActionBlock<FakeJob>> _mainQueue;
private Dictionary<string, ActionBlock<FakeJob>> _projectQueues;
private Dictionary<Guid, CancellationTokenSource> _projectCancellationTokens;
public JobMaster()
{
_mainQueue = new BufferBlock<ActionBlock<FakeJob>>(new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });
_projectQueues = new Dictionary<string, ActionBlock<FakeJob>>();
}
public async Task WorkOnJobs()
{
List<string> projectIds = new List<string>() { "Testkunde 1", "Testkunde 2", "Testkunde 3" };
List<Task> producerTasks = new List<Task>();
List<FakeJob> jobs = new List<FakeJob>();
jobs.Add(new FakeJob { Duration = 10, Id = 1, ProjectName = projectIds[0] });
jobs.Add(new FakeJob { Duration = 10, Id = 2, ProjectName = projectIds[0] });
jobs.Add(new FakeJob { Duration = 10, Id = 3, ProjectName = projectIds[0] });
jobs.Add(new FakeJob { Duration = 4, Id = 4, ProjectName = projectIds[1] });
jobs.Add(new FakeJob { Duration = 4, Id = 5, ProjectName = projectIds[1] });
jobs.Add(new FakeJob { Duration = 4, Id = 6, ProjectName = projectIds[1] });
jobs.Add(new FakeJob { Duration = 2, Id = 7, ProjectName = projectIds[2] });
jobs.Add(new FakeJob { Duration = 2, Id = 8, ProjectName = projectIds[2] });
jobs.Add(new FakeJob { Duration = 2, Id = 9, ProjectName = projectIds[2] });
foreach (var loopProjectId in projectIds)
{
producerTasks.Add(WorkOnJobsForForProject(loopProjectId, jobs));
}
await Task.WhenAll(producerTasks);
}
private async Task WorkOnJobsForForProject(string projectId, List<FakeJob> jobDB)
{
var consumerOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 };
while (true)
{
foreach (var loopJob in jobDB.Where(x => x.ProjectName == projectId))
{
var consumer = new ActionBlock<FakeJob>(StartJob, consumerOptions);
_projectQueues[projectId] = consumer;
await _mainQueue.SendAsync(_projectQueues[projectId]);
await _projectQueues[projectId].SendAsync(loopJob);
await Task.WhenAll(_projectQueues[projectId].Completion);
}
break;
}
}
private async Task StartJob(FakeJob job)
{
Log.Logger.Information("Start job [{A}] for [{B}]", job.Id, job.ProjectName);
await Task.Delay(job.Duration * 1000);
_projectQueues[job.ProjectName].Complete();
Log.Logger.Information("Finished job [{A}]", job.Id);
}
}
[编辑 2] 我当前的尝试使用 MaxDegreeOfParallelism = 3 for _mainQueue。 但不是 2 ;( 如果我将它设置为 2,则不会执行作业 9 ;(
public class FakeJob
{
public int Id { get; set; }
public string ProjectName { get; set; }
public int Duration { get; set; }
public bool IsComplete { get; set; }
}
public class JobMaster_BackUp
{
private ActionBlock<CustomerQueue> _mainQueue;
private Dictionary<string, ActionBlock<FakeJob>> _projectQueues;
public static List<FakeJob> FakeJobDB = new List<FakeJob>();
public JobMaster_BackUp()
{
_mainQueue = new ActionBlock<CustomerQueue>(MainQueueJob, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });
_projectQueues = new Dictionary<string, ActionBlock<FakeJob>>();
}
public async Task WorkOnJobs()
{
List<string> projectIds = new List<string>() { "Testkunde 1", "Testkunde 2", "Testkunde 3" };
List<Task> producerTasks = new List<Task>();
FakeJobDB.Add(new FakeJob { Duration = 3, Id = 1, ProjectName = projectIds[0] });
FakeJobDB.Add(new FakeJob { Duration = 3, Id = 2, ProjectName = projectIds[0] });
FakeJobDB.Add(new FakeJob { Duration = 3, Id = 3, ProjectName = projectIds[0] });
FakeJobDB.Add(new FakeJob { Duration = 3, Id = 4, ProjectName = projectIds[1] });
FakeJobDB.Add(new FakeJob { Duration = 3, Id = 5, ProjectName = projectIds[1] });
FakeJobDB.Add(new FakeJob { Duration = 3, Id = 6, ProjectName = projectIds[1] });
FakeJobDB.Add(new FakeJob { Duration = 3, Id = 7, ProjectName = projectIds[2] });
FakeJobDB.Add(new FakeJob { Duration = 3, Id = 8, ProjectName = projectIds[2] });
FakeJobDB.Add(new FakeJob { Duration = 3, Id = 9, ProjectName = projectIds[2] });
foreach (var loopProjectId in projectIds)
{
CancellationTokenHandler.ProjectCancellationTokens[loopProjectId] = new CancellationTokenSource();
producerTasks.Add(WorkOnJobsForForProject(loopProjectId, CancellationTokenHandler.ProjectCancellationTokens[loopProjectId].Token));
}
await Task.WhenAll(producerTasks);
}
private FakeJob GetNextJob(string projectId)
{
FakeJob nextJob = FakeJobDB.Where(x => x.ProjectName == projectId && x.IsComplete == false).OrderBy(x => x.Id).FirstOrDefault();
if (nextJob != null)
{
Log.Logger.Information("GetNextJob [" + nextJob.Id + "]");
}
return nextJob;
}
private async Task WorkOnJobsForForProject(string projectId, CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
FakeJob loopJob = GetNextJob(projectId);
if (loopJob != null)
{
CustomerQueue customerQueue = new CustomerQueue(loopJob);
await _mainQueue.SendAsync(customerQueue);
await customerQueue.WaitForCompletion();
}
}
}
private async Task MainQueueJob(CustomerQueue consumer)
{
consumer.Start();
await Task.WhenAll(consumer.WaitForCompletion());
}
}
public class CustomerQueue
{
private ActionBlock<FakeJob> _queue;
private FakeJob _job;
public CustomerQueue(FakeJob job)
{
_job = job;
var consumerOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 };
_queue = new ActionBlock<FakeJob>(StartJob, consumerOptions);
}
public void Start()
{
_queue.SendAsync(_job);
}
public async Task WaitForCompletion()
{
await Task.WhenAll(_queue.Completion);
}
private async Task StartJob(FakeJob job)
{
//Log.Logger.Information("Start job [{A}] for [{B}]", job.Id, job.ProjectName);
await Task.Delay(job.Duration * 1000);
JobMaster_BackUp.FakeJobDB.Single(x => x.Id == job.Id).IsComplete = true;
_queue.Complete();
Log.Logger.Information("Finished job [{A}]", job.Id);
}
}
This does one thing right. It just executes one job per customer in parallel. But the max 2 Jobs in total does not work.
我不清楚你想要完成什么,但我认为你缺少的是不同数据流块的协调。
有几种方法可以实现这一点。一种是使用 SemaphoreSlim
,正如您已经注意到的那样。您将创建一个最大计数为 10 的单个 SemaphoreSlim
,将其传递给 CustomerQueue
构造函数,并让您的 StartJob
在开头执行 await WaitAsync
并在 Release
最后。
另一种方法是为操作块提供一个调度程序 - 具体来说,ConcurrentExclusiveSchedulerPair
的 Concurrent
一半,并发设置为 10。您可以将其传递到 ConsumerQueue
构造函数并将其设置在块选项上。