使用相同代码实现时无法模仿 ConcurrentStack 的性能
Cannot mimic performance of ConcurrentStack when using same code implementation
我正在学习编写并发数据结构并将 ConcurrentStack 实现作为学习练习。作为起点,我使用 IlSpy 创建了 ConcurrentStack 实现的副本,将其反编译为 C#。我暂时仅限于调查和使用 Push 和 TryPop 方法。
但是我的实现比使用原来的要慢得多。
我的测试使用 4 个线程(在单个插槽上,4 个核心 CPU),每个线程针对不同的核心具有线程关联性。每个线程执行 1,000,000 次循环,每个循环执行 3 次推送和 3 次弹出。 运行 多次测试完成所有线程的平均时间是...
- 并发堆栈 => 445 毫秒
- 克隆 Push/TryPop => 670 毫秒
因此,据我所知,尽管代码在两者之间是相同的,但克隆还是慢了大约 50%。我 运行 在 运行 中测试了 500 次并取所有 运行 的平均值。所以我不认为问题出在代码的初始 JIT 上。
知道为什么复制这些方法会慢很多吗?
C# 实现
(为了完整起见,我提供了可用于复制结果的 C# 控制台应用程序代码。对于任何有兴趣查看他们是否获得与我相同的结果的人。)
class Program
{
static void Main(string[] args)
{
int processors = Environment.ProcessorCount;
Console.WriteLine("Processors: {0}", processors);
List<Type> runnersT = new List<Type>() { typeof(ThreadRunnerConcurrent),
typeof(ThreadRunnerCASStack)};
int cycles = 500;
foreach (Type runnerT in runnersT)
{
long total = 0;
for (int i = 0; i < cycles; i++)
{
// Create a thread runner per processor
List<ThreadRunner> runners = new List<ThreadRunner>();
for (int j = 0; j < processors; j++)
{
ThreadRunner runner = Activator.CreateInstance(runnerT) as ThreadRunner;
runner.Processor = j;
runners.Add(runner);
}
// Start each runner going
Stopwatch sw = new Stopwatch();
sw.Start();
runners.ForEach((r) => r.Start());
// Wait for all the runners to exit
runners.ForEach((r) => r.Join());
runners.ForEach((r) => r.Check());
sw.Stop();
total += sw.ElapsedMilliseconds;
}
Console.WriteLine("{0} Average: {1}ms", runnerT.Name, (total / cycles));
}
Console.WriteLine("Finished");
Console.ReadLine();
}
}
abstract class ThreadRunner
{
private int _processor;
private Thread _thread;
public ThreadRunner()
{
}
public int Processor
{
get { return _processor; }
set { _processor = value; }
}
public void Start()
{
_thread = new Thread(new ParameterizedThreadStart(Run));
_thread.Start();
}
public void Join()
{
_thread.Join();
}
public abstract void Check();
protected abstract void Run(int cycles);
private void Run(object param)
{
SetAffinity();
Run(1000000);
}
private void SetAffinity()
{
#pragma warning disable 618
int osThreadId = AppDomain.GetCurrentThreadId();
#pragma warning restore 618
// Set the thread's processor affinity
ProcessThread thread = Process.GetCurrentProcess().Threads.Cast<ProcessThread>().Where(t => t.Id == osThreadId).Single();
thread.ProcessorAffinity = new IntPtr(1L << Processor);
}
}
class ThreadRunnerConcurrent : ThreadRunner
{
private static ConcurrentStack<int> _stack = new ConcurrentStack<int>();
protected override void Run(int cycles)
{
int ret;
for (int i = 0; i < cycles; i++)
{
_stack.Push(i);
_stack.Push(i);
while (!_stack.TryPop(out ret)) ;
_stack.Push(i);
while (!_stack.TryPop(out ret)) ;
while (!_stack.TryPop(out ret)) ;
}
}
public override void Check()
{
if (_stack.Count > 0)
Console.WriteLine("ThreadRunnerConcurrent has entries!");
}
}
class ThreadRunnerCASStack : ThreadRunner
{
private static CASStack<int> _stack = new CASStack<int>();
protected override void Run(int cycles)
{
int ret;
for (int i = 0; i < cycles; i++)
{
_stack.Push(i);
_stack.Push(i);
while (!_stack.TryPop(out ret)) ;
_stack.Push(i);
while (!_stack.TryPop(out ret)) ;
while (!_stack.TryPop(out ret)) ;
}
}
public override void Check()
{
if (_stack.Count > 0)
Console.WriteLine("ThreadRunnerCASStack has entries!");
}
}
class CASStack<T>
{
private class Node
{
internal readonly T m_value;
internal CASStack<T>.Node m_next;
internal Node(T value)
{
this.m_value = value;
this.m_next = null;
}
}
private volatile CASStack<T>.Node m_head;
public void Push(T item)
{
CASStack<T>.Node node = new CASStack<T>.Node(item);
node.m_next = this.m_head;
if (Interlocked.CompareExchange<CASStack<T>.Node>(ref this.m_head, node, node.m_next) == node.m_next)
return;
PushCore(node, node);
}
private void PushCore(Node head, Node tail)
{
SpinWait spinWait = default(SpinWait);
do
{
spinWait.SpinOnce();
tail.m_next = this.m_head;
}
while (Interlocked.CompareExchange<CASStack<T>.Node>(ref this.m_head, head, tail.m_next) != tail.m_next);
}
public bool TryPop(out T result)
{
CASStack<T>.Node head = this.m_head;
if (head == null)
{
result = default(T);
return false;
}
if (Interlocked.CompareExchange<CASStack<T>.Node>(ref this.m_head, head.m_next, head) == head)
{
result = head.m_value;
return true;
}
return TryPopCore(out result);
}
private bool TryPopCore(out T result)
{
CASStack<T>.Node node;
if (TryPopCore(1, out node) == 1)
{
result = node.m_value;
return true;
}
result = default(T);
return false;
}
private int TryPopCore(int count, out CASStack<T>.Node poppedHead)
{
SpinWait spinWait = default(SpinWait);
int num = 1;
Random random = new Random(Environment.TickCount & 2147483647);
CASStack<T>.Node head;
int num2;
while (true)
{
head = this.m_head;
if (head == null)
break;
CASStack<T>.Node node = head;
num2 = 1;
while (num2 < count && node.m_next != null)
{
node = node.m_next;
num2++;
}
if (Interlocked.CompareExchange<CASStack<T>.Node>(ref this.m_head, node.m_next, head) == head)
goto Block_5;
for (int i = 0; i < num; i++)
spinWait.SpinOnce();
num = (spinWait.NextSpinWillYield ? random.Next(1, 8) : (num * 2));
}
poppedHead = null;
return 0;
Block_5:
poppedHead = head;
return num2;
}
}
#endregion
ConcurrentStack<T>
有一项优势是您的 CASStack<T>
所没有的,即使两者的代码相同。
ConcurrentStack<T>
在您的计算机上安装了一个 ngen'd native image,它是在您安装 .Net 框架时编译的。您的 CASStack<T>
正在通过 JIT 进行编译,并且由于 JIT 必须很快,因此它执行的优化不如 ngen 中的 AOT 编译器那么多。
我在我的电脑上测试了你的代码。没有生成你的图像,我得到了这些结果:
Processors: 4
ThreadRunnerConcurrent Average: 764ms
ThreadRunnerCASStack Average: 948ms
Finished
生成后:
Processors: 4
ThreadRunnerConcurrent Average: 778ms
ThreadRunnerCASStack Average: 742ms
Finished
我正在学习编写并发数据结构并将 ConcurrentStack 实现作为学习练习。作为起点,我使用 IlSpy 创建了 ConcurrentStack 实现的副本,将其反编译为 C#。我暂时仅限于调查和使用 Push 和 TryPop 方法。
但是我的实现比使用原来的要慢得多。
我的测试使用 4 个线程(在单个插槽上,4 个核心 CPU),每个线程针对不同的核心具有线程关联性。每个线程执行 1,000,000 次循环,每个循环执行 3 次推送和 3 次弹出。 运行 多次测试完成所有线程的平均时间是...
- 并发堆栈 => 445 毫秒
- 克隆 Push/TryPop => 670 毫秒
因此,据我所知,尽管代码在两者之间是相同的,但克隆还是慢了大约 50%。我 运行 在 运行 中测试了 500 次并取所有 运行 的平均值。所以我不认为问题出在代码的初始 JIT 上。
知道为什么复制这些方法会慢很多吗?
C# 实现
(为了完整起见,我提供了可用于复制结果的 C# 控制台应用程序代码。对于任何有兴趣查看他们是否获得与我相同的结果的人。)
class Program
{
static void Main(string[] args)
{
int processors = Environment.ProcessorCount;
Console.WriteLine("Processors: {0}", processors);
List<Type> runnersT = new List<Type>() { typeof(ThreadRunnerConcurrent),
typeof(ThreadRunnerCASStack)};
int cycles = 500;
foreach (Type runnerT in runnersT)
{
long total = 0;
for (int i = 0; i < cycles; i++)
{
// Create a thread runner per processor
List<ThreadRunner> runners = new List<ThreadRunner>();
for (int j = 0; j < processors; j++)
{
ThreadRunner runner = Activator.CreateInstance(runnerT) as ThreadRunner;
runner.Processor = j;
runners.Add(runner);
}
// Start each runner going
Stopwatch sw = new Stopwatch();
sw.Start();
runners.ForEach((r) => r.Start());
// Wait for all the runners to exit
runners.ForEach((r) => r.Join());
runners.ForEach((r) => r.Check());
sw.Stop();
total += sw.ElapsedMilliseconds;
}
Console.WriteLine("{0} Average: {1}ms", runnerT.Name, (total / cycles));
}
Console.WriteLine("Finished");
Console.ReadLine();
}
}
abstract class ThreadRunner
{
private int _processor;
private Thread _thread;
public ThreadRunner()
{
}
public int Processor
{
get { return _processor; }
set { _processor = value; }
}
public void Start()
{
_thread = new Thread(new ParameterizedThreadStart(Run));
_thread.Start();
}
public void Join()
{
_thread.Join();
}
public abstract void Check();
protected abstract void Run(int cycles);
private void Run(object param)
{
SetAffinity();
Run(1000000);
}
private void SetAffinity()
{
#pragma warning disable 618
int osThreadId = AppDomain.GetCurrentThreadId();
#pragma warning restore 618
// Set the thread's processor affinity
ProcessThread thread = Process.GetCurrentProcess().Threads.Cast<ProcessThread>().Where(t => t.Id == osThreadId).Single();
thread.ProcessorAffinity = new IntPtr(1L << Processor);
}
}
class ThreadRunnerConcurrent : ThreadRunner
{
private static ConcurrentStack<int> _stack = new ConcurrentStack<int>();
protected override void Run(int cycles)
{
int ret;
for (int i = 0; i < cycles; i++)
{
_stack.Push(i);
_stack.Push(i);
while (!_stack.TryPop(out ret)) ;
_stack.Push(i);
while (!_stack.TryPop(out ret)) ;
while (!_stack.TryPop(out ret)) ;
}
}
public override void Check()
{
if (_stack.Count > 0)
Console.WriteLine("ThreadRunnerConcurrent has entries!");
}
}
class ThreadRunnerCASStack : ThreadRunner
{
private static CASStack<int> _stack = new CASStack<int>();
protected override void Run(int cycles)
{
int ret;
for (int i = 0; i < cycles; i++)
{
_stack.Push(i);
_stack.Push(i);
while (!_stack.TryPop(out ret)) ;
_stack.Push(i);
while (!_stack.TryPop(out ret)) ;
while (!_stack.TryPop(out ret)) ;
}
}
public override void Check()
{
if (_stack.Count > 0)
Console.WriteLine("ThreadRunnerCASStack has entries!");
}
}
class CASStack<T>
{
private class Node
{
internal readonly T m_value;
internal CASStack<T>.Node m_next;
internal Node(T value)
{
this.m_value = value;
this.m_next = null;
}
}
private volatile CASStack<T>.Node m_head;
public void Push(T item)
{
CASStack<T>.Node node = new CASStack<T>.Node(item);
node.m_next = this.m_head;
if (Interlocked.CompareExchange<CASStack<T>.Node>(ref this.m_head, node, node.m_next) == node.m_next)
return;
PushCore(node, node);
}
private void PushCore(Node head, Node tail)
{
SpinWait spinWait = default(SpinWait);
do
{
spinWait.SpinOnce();
tail.m_next = this.m_head;
}
while (Interlocked.CompareExchange<CASStack<T>.Node>(ref this.m_head, head, tail.m_next) != tail.m_next);
}
public bool TryPop(out T result)
{
CASStack<T>.Node head = this.m_head;
if (head == null)
{
result = default(T);
return false;
}
if (Interlocked.CompareExchange<CASStack<T>.Node>(ref this.m_head, head.m_next, head) == head)
{
result = head.m_value;
return true;
}
return TryPopCore(out result);
}
private bool TryPopCore(out T result)
{
CASStack<T>.Node node;
if (TryPopCore(1, out node) == 1)
{
result = node.m_value;
return true;
}
result = default(T);
return false;
}
private int TryPopCore(int count, out CASStack<T>.Node poppedHead)
{
SpinWait spinWait = default(SpinWait);
int num = 1;
Random random = new Random(Environment.TickCount & 2147483647);
CASStack<T>.Node head;
int num2;
while (true)
{
head = this.m_head;
if (head == null)
break;
CASStack<T>.Node node = head;
num2 = 1;
while (num2 < count && node.m_next != null)
{
node = node.m_next;
num2++;
}
if (Interlocked.CompareExchange<CASStack<T>.Node>(ref this.m_head, node.m_next, head) == head)
goto Block_5;
for (int i = 0; i < num; i++)
spinWait.SpinOnce();
num = (spinWait.NextSpinWillYield ? random.Next(1, 8) : (num * 2));
}
poppedHead = null;
return 0;
Block_5:
poppedHead = head;
return num2;
}
}
#endregion
ConcurrentStack<T>
有一项优势是您的 CASStack<T>
所没有的,即使两者的代码相同。
ConcurrentStack<T>
在您的计算机上安装了一个 ngen'd native image,它是在您安装 .Net 框架时编译的。您的 CASStack<T>
正在通过 JIT 进行编译,并且由于 JIT 必须很快,因此它执行的优化不如 ngen 中的 AOT 编译器那么多。
我在我的电脑上测试了你的代码。没有生成你的图像,我得到了这些结果:
Processors: 4
ThreadRunnerConcurrent Average: 764ms
ThreadRunnerCASStack Average: 948ms
Finished
生成后:
Processors: 4
ThreadRunnerConcurrent Average: 778ms
ThreadRunnerCASStack Average: 742ms
Finished