如何同时入队和出队?
How to enqueue and dequeue a lot at the same time?
我有一个包含两个线程的简单场景,其中第一个线程永久读取一些数据并将该数据排入队列。第二个线程首先查看该队列中的单个对象并进行一些条件检查。如果这些都很好,单个对象将出队并传递给某些处理。
我曾尝试使用 ConcurrentQueue
,这是一个简单队列的线程安全实现,但这个的问题是所有调用都是阻塞的。这意味着如果第一个线程正在排队一个对象,第二个线程不能查看或出队一个对象。
在我的情况下,我需要在末尾入队并同时从队列的开头出队。
C#的lock语句也会。
所以我的问题是是否可以并行执行这两个操作,而不会以线程安全的方式相互阻塞。
这是我的第一次尝试,这是我遇到的问题的类似示例。
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Scenario {
public class Program {
public static void Main(string[] args) {
Scenario scenario = new Scenario();
scenario.Start();
Console.ReadKey();
}
public class Scenario {
public Scenario() {
someData = new Queue<int>();
}
public void Start() {
Task.Factory.StartNew(firstThread);
Task.Factory.StartNew(secondThread);
}
private void firstThread() {
Random random = new Random();
while (true) {
int newData = random.Next(1, 100);
someData.Enqueue(newData);
Console.WriteLine("Enqueued " + newData);
}
}
private void secondThread() {
Random random = new Random();
while (true) {
if (someData.Count == 0) {
continue;
}
int singleData = someData.Peek();
int someValue = random.Next(1, 100);
if (singleData > someValue || singleData == 1 || singleData == 99) {
singleData = someData.Dequeue();
Console.WriteLine("Dequeued " + singleData);
// ... processing ...
}
}
}
private readonly Queue<int> someData;
}
}
}
第二个例子:
public class Scenario {
public Scenario() {
someData = new ConcurrentQueue<int>();
}
public void Start() {
Task.Factory.StartNew(firstThread);
Task.Factory.StartNew(secondThread);
}
private void firstThread() {
Random random = new Random();
while (true) {
int newData = random.Next(1, 100);
someData.Enqueue(newData);
lock (syncRoot) { Console.WriteLine($"Enqued {enqued++} Dequed {dequed}"); }
}
}
private void secondThread() {
Random random = new Random();
while (true) {
if (!someData.TryPeek(out int singleData)) {
continue;
}
int someValue = random.Next(1, 100);
if (singleData > someValue || singleData == 1 || singleData == 99) {
if (!someData.TryDequeue(out singleData)) {
continue;
}
lock (syncRoot) { Console.WriteLine($"Enqued {enqued} Dequed {dequed++}"); }
// ... processing ...
}
}
}
private int enqued = 0;
private int dequed = 0;
private readonly ConcurrentQueue<int> someData;
private static readonly object syncRoot = new object();
}
首先:我强烈鼓励您重新考虑您的多线程技术和共享内存数据结构是否是正确的方法。具有多个控制线程共享对数据结构的访问的代码很难正确,失败可能是微妙的、灾难性的,并且难以调试。
其次:如果您热衷于多线程和共享内存数据结构,我强烈鼓励您使用专家设计的数据类型,例如并发队列,而不是自己滚动。
现在我已经消除了这些警告:这里有一种方法可以解决您的问题。它非常复杂,您应该获得 C# 内存模型专家的服务,以验证您的解决方案的正确性(如果您采用此解决方案)。如果没有实际上是内存模型专家的人的帮助,我不会认为自己有能力实施我将要描述的方案。
目标是拥有一个支持同时入队和出队操作以及低锁争用的队列。
你想要的是两个不可变的堆栈变量,称为enqueue堆栈和dequeue堆栈,每个都有自己的锁。
入队操作为:
- 取入队锁
- 将项目推入队列堆栈;这会在 O(1) 时间内产生一个新堆栈。
- 将新产生的堆栈分配给入队堆栈变量。
- 释放入队锁
出队操作为:
- 取dequeue锁
- 如果出队栈为空则
- 获取入队锁
- 枚举入队栈并用它来构建出队栈;此 反转 入队堆栈,它保持 属性 我们想要的:先进先出。
- 将空的不可变堆栈分配给入队堆栈变量
- 释放入队锁
- 将新堆栈分配给出队堆栈
- 如果出队堆栈为空,则抛出或放弃并稍后重试,或者休眠直到入队操作发出信号,或者在这里做任何正确的事情。
- 出队堆栈不为空。
- 从出队堆栈中弹出一个项目,在 O(1) 中生成一个新堆栈。
- 将新堆栈分配给出列堆栈变量。
- 释放出队锁。
- 处理项目。
注意,当然如果只有一个线程出队,那么我们根本就不需要出队锁,但是这个方案可以有很多线程出队。
假设入队堆栈上有 1000 个项目,出队堆栈上有 0 个项目。当我们第一次出队时,我们做了一个昂贵的 O(n) 操作来反转入队堆栈一次,但现在我们在出队堆栈上有 1000 个项目。一旦出队堆栈很大,出队线程可以将大部分时间用于处理,而入队线程将大部分时间用于入队。排队锁的争用罕见,但发生时代价高昂。
为什么要使用不可变数据结构?我在这里描述的所有内容也适用于可变堆栈,但是(1)不可变堆栈更容易推理,(2)如果你真的想过着危险的生活,你可以省略一些锁并进行互锁交换操作;确保您了解 一切 关于在低锁定条件下可能重新排序的操作,如果您这样做的话。
更新:
The real problem is that i cant dequeue and process a lot of points because i am permanently reading and enquing new points. That enqueue calls are blocking the processing step.
好吧,如果这是您真正的问题,那么 在问题中提及它 而不是将其隐藏在评论中是个好主意。帮助我们帮助你。
您可以在这里做很多事情。例如,您可以将入队线程的优先级设置为低于出队线程的优先级。或者您可以有多个出队线程,与您机器中的 CPU 一样多。或者,如果出队没有跟上,您可以动态地选择删除一些入队操作。在不了解您的实际问题的情况下,很难就如何解决它提出建议。
我有一个包含两个线程的简单场景,其中第一个线程永久读取一些数据并将该数据排入队列。第二个线程首先查看该队列中的单个对象并进行一些条件检查。如果这些都很好,单个对象将出队并传递给某些处理。
我曾尝试使用 ConcurrentQueue
,这是一个简单队列的线程安全实现,但这个的问题是所有调用都是阻塞的。这意味着如果第一个线程正在排队一个对象,第二个线程不能查看或出队一个对象。
在我的情况下,我需要在末尾入队并同时从队列的开头出队。
C#的lock语句也会。
所以我的问题是是否可以并行执行这两个操作,而不会以线程安全的方式相互阻塞。
这是我的第一次尝试,这是我遇到的问题的类似示例。
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Scenario {
public class Program {
public static void Main(string[] args) {
Scenario scenario = new Scenario();
scenario.Start();
Console.ReadKey();
}
public class Scenario {
public Scenario() {
someData = new Queue<int>();
}
public void Start() {
Task.Factory.StartNew(firstThread);
Task.Factory.StartNew(secondThread);
}
private void firstThread() {
Random random = new Random();
while (true) {
int newData = random.Next(1, 100);
someData.Enqueue(newData);
Console.WriteLine("Enqueued " + newData);
}
}
private void secondThread() {
Random random = new Random();
while (true) {
if (someData.Count == 0) {
continue;
}
int singleData = someData.Peek();
int someValue = random.Next(1, 100);
if (singleData > someValue || singleData == 1 || singleData == 99) {
singleData = someData.Dequeue();
Console.WriteLine("Dequeued " + singleData);
// ... processing ...
}
}
}
private readonly Queue<int> someData;
}
}
}
第二个例子:
public class Scenario {
public Scenario() {
someData = new ConcurrentQueue<int>();
}
public void Start() {
Task.Factory.StartNew(firstThread);
Task.Factory.StartNew(secondThread);
}
private void firstThread() {
Random random = new Random();
while (true) {
int newData = random.Next(1, 100);
someData.Enqueue(newData);
lock (syncRoot) { Console.WriteLine($"Enqued {enqued++} Dequed {dequed}"); }
}
}
private void secondThread() {
Random random = new Random();
while (true) {
if (!someData.TryPeek(out int singleData)) {
continue;
}
int someValue = random.Next(1, 100);
if (singleData > someValue || singleData == 1 || singleData == 99) {
if (!someData.TryDequeue(out singleData)) {
continue;
}
lock (syncRoot) { Console.WriteLine($"Enqued {enqued} Dequed {dequed++}"); }
// ... processing ...
}
}
}
private int enqued = 0;
private int dequed = 0;
private readonly ConcurrentQueue<int> someData;
private static readonly object syncRoot = new object();
}
首先:我强烈鼓励您重新考虑您的多线程技术和共享内存数据结构是否是正确的方法。具有多个控制线程共享对数据结构的访问的代码很难正确,失败可能是微妙的、灾难性的,并且难以调试。
其次:如果您热衷于多线程和共享内存数据结构,我强烈鼓励您使用专家设计的数据类型,例如并发队列,而不是自己滚动。
现在我已经消除了这些警告:这里有一种方法可以解决您的问题。它非常复杂,您应该获得 C# 内存模型专家的服务,以验证您的解决方案的正确性(如果您采用此解决方案)。如果没有实际上是内存模型专家的人的帮助,我不会认为自己有能力实施我将要描述的方案。
目标是拥有一个支持同时入队和出队操作以及低锁争用的队列。
你想要的是两个不可变的堆栈变量,称为enqueue堆栈和dequeue堆栈,每个都有自己的锁。
入队操作为:
- 取入队锁
- 将项目推入队列堆栈;这会在 O(1) 时间内产生一个新堆栈。
- 将新产生的堆栈分配给入队堆栈变量。
- 释放入队锁
出队操作为:
- 取dequeue锁
- 如果出队栈为空则
- 获取入队锁
- 枚举入队栈并用它来构建出队栈;此 反转 入队堆栈,它保持 属性 我们想要的:先进先出。
- 将空的不可变堆栈分配给入队堆栈变量
- 释放入队锁
- 将新堆栈分配给出队堆栈
- 如果出队堆栈为空,则抛出或放弃并稍后重试,或者休眠直到入队操作发出信号,或者在这里做任何正确的事情。
- 出队堆栈不为空。
- 从出队堆栈中弹出一个项目,在 O(1) 中生成一个新堆栈。
- 将新堆栈分配给出列堆栈变量。
- 释放出队锁。
- 处理项目。
注意,当然如果只有一个线程出队,那么我们根本就不需要出队锁,但是这个方案可以有很多线程出队。
假设入队堆栈上有 1000 个项目,出队堆栈上有 0 个项目。当我们第一次出队时,我们做了一个昂贵的 O(n) 操作来反转入队堆栈一次,但现在我们在出队堆栈上有 1000 个项目。一旦出队堆栈很大,出队线程可以将大部分时间用于处理,而入队线程将大部分时间用于入队。排队锁的争用罕见,但发生时代价高昂。
为什么要使用不可变数据结构?我在这里描述的所有内容也适用于可变堆栈,但是(1)不可变堆栈更容易推理,(2)如果你真的想过着危险的生活,你可以省略一些锁并进行互锁交换操作;确保您了解 一切 关于在低锁定条件下可能重新排序的操作,如果您这样做的话。
更新:
The real problem is that i cant dequeue and process a lot of points because i am permanently reading and enquing new points. That enqueue calls are blocking the processing step.
好吧,如果这是您真正的问题,那么 在问题中提及它 而不是将其隐藏在评论中是个好主意。帮助我们帮助你。
您可以在这里做很多事情。例如,您可以将入队线程的优先级设置为低于出队线程的优先级。或者您可以有多个出队线程,与您机器中的 CPU 一样多。或者,如果出队没有跟上,您可以动态地选择删除一些入队操作。在不了解您的实际问题的情况下,很难就如何解决它提出建议。