C# - 带插槽的工作线程,动态添加的项目
C# - Worker thread with slots, items being dynamically added
我有 window 服务,每 30 秒轮询一次 Web 服务以获取新项目。如果发现任何新项目,它会检查它们是否需要“处理”,然后将它们放入列表中进行处理。我产生不同的线程一次处理 5 个,当一个完成时,另一个将填充空槽。一切完成后,程序将休眠 30 秒,然后再次轮询。
我的问题是,在处理项目(最多可能需要 15 分钟)的同时,正在创建可能也需要处理的新项目。我的问题是主线程在等待每个最后一个线程完成后才休眠并重新开始整个过程。
我想要做的是让主线程继续每 30 秒轮询一次 Web 服务,但是不要被阻止,而是将它找到的任何新项目添加到列表中,该列表将在单独的工作线程。在那个工作线程中,它仍然会说只有 5 个插槽可用,但它们基本上总是会被填满,假设主线程继续寻找要处理的新项目。
我希望这是有道理的。谢谢!
编辑:更新代码示例
我把它放在一起作为在 ConcurrentQueue 上运行的工作线程。有什么办法可以改善吗?
private void ThreadWorker() {
DateTime dtStart = DateTime.Now;
int iNumOfConcurrentSlots = 6
Thread[] threads = new Thread[iNumOfConcurrentSlots];
while (true) {
for (int i = 0; i < m_iNumOfConcurrentSlots; i++) {
if (m_tAssetQueue.TryDequeue(out Asset aa)) {
threads[i] = new Thread(() => ProcessAsset(aa));
threads[i].Start();
Thread.Sleep(500);
}
}
}
}
编辑:啊是的,上面的方法行不通。我需要一种方法,能够不对 ConcurrentSlots 的数量进行硬编码,而是让每个线程基本上等待并在队列中寻找一些东西,如果找到了,就处理它。但是我还需要一种方法来表明 ProcessAsset() 函数已完成以释放线程并允许创建另一个线程....
一个简单的方法是让 5 个线程从并发队列中读取数据。主线程对项目进行排队,工作线程阻塞从队列中读取。
注意:workers 处于无限循环中。他们调用 TryDequeue,如果他们得到一个就处理这个项目,或者如果他们没有得到某样东西就休眠一秒钟。他们还可以检查退出标志。
为了让您的服务 属性 正常运行,您可能需要一个独立的轮询线程来对项目进行排队。保持主线程响应启动、停止、暂停请求。
工作线程的伪代码:
While true
If TryDequeue then
process data
If exit flag is true, break
While pause flag, sleep
Sleep
轮询线程的伪代码:
While true
Poll web service
Queue items in concurrent queue
If exit flag true, break
While pause flag, sleep
Sleep
主线程的伪代码:
Start polling thread
Start n worker threads with above code
Handle stop:
set exit flag to true
Handle pause
set pause flag to true
我有 window 服务,每 30 秒轮询一次 Web 服务以获取新项目。如果发现任何新项目,它会检查它们是否需要“处理”,然后将它们放入列表中进行处理。我产生不同的线程一次处理 5 个,当一个完成时,另一个将填充空槽。一切完成后,程序将休眠 30 秒,然后再次轮询。
我的问题是,在处理项目(最多可能需要 15 分钟)的同时,正在创建可能也需要处理的新项目。我的问题是主线程在等待每个最后一个线程完成后才休眠并重新开始整个过程。
我想要做的是让主线程继续每 30 秒轮询一次 Web 服务,但是不要被阻止,而是将它找到的任何新项目添加到列表中,该列表将在单独的工作线程。在那个工作线程中,它仍然会说只有 5 个插槽可用,但它们基本上总是会被填满,假设主线程继续寻找要处理的新项目。
我希望这是有道理的。谢谢!
编辑:更新代码示例
我把它放在一起作为在 ConcurrentQueue 上运行的工作线程。有什么办法可以改善吗?
private void ThreadWorker() {
DateTime dtStart = DateTime.Now;
int iNumOfConcurrentSlots = 6
Thread[] threads = new Thread[iNumOfConcurrentSlots];
while (true) {
for (int i = 0; i < m_iNumOfConcurrentSlots; i++) {
if (m_tAssetQueue.TryDequeue(out Asset aa)) {
threads[i] = new Thread(() => ProcessAsset(aa));
threads[i].Start();
Thread.Sleep(500);
}
}
}
}
编辑:啊是的,上面的方法行不通。我需要一种方法,能够不对 ConcurrentSlots 的数量进行硬编码,而是让每个线程基本上等待并在队列中寻找一些东西,如果找到了,就处理它。但是我还需要一种方法来表明 ProcessAsset() 函数已完成以释放线程并允许创建另一个线程....
一个简单的方法是让 5 个线程从并发队列中读取数据。主线程对项目进行排队,工作线程阻塞从队列中读取。
注意:workers 处于无限循环中。他们调用 TryDequeue,如果他们得到一个就处理这个项目,或者如果他们没有得到某样东西就休眠一秒钟。他们还可以检查退出标志。
为了让您的服务 属性 正常运行,您可能需要一个独立的轮询线程来对项目进行排队。保持主线程响应启动、停止、暂停请求。
工作线程的伪代码:
While true
If TryDequeue then
process data
If exit flag is true, break
While pause flag, sleep
Sleep
轮询线程的伪代码:
While true
Poll web service
Queue items in concurrent queue
If exit flag true, break
While pause flag, sleep
Sleep
主线程的伪代码:
Start polling thread
Start n worker threads with above code
Handle stop:
set exit flag to true
Handle pause
set pause flag to true