如何从更基本的同步原语中创建 multiple-read/single-write 锁?
How to make a multiple-read/single-write lock from more basic synchronization primitives?
我们发现我们的代码中有几个地方,受互斥锁保护的数据的并发读取相当普遍,而写入则很少见。我们的测量结果似乎表明,使用简单的互斥量会严重阻碍读取该数据的代码的性能。所以我们需要的是 multiple-read/single-write 互斥锁。我知道这可以建立在更简单的原语之上,但在我自己尝试之前,我宁愿询问现有的知识:
构建 multiple-read/single-write 锁定更简单的同步原语的批准方法是什么?
我确实知道如何制作它,但我宁愿得到不偏不倚的答案(可能是错误的)。 (注意:我期望的是如何做的解释,可能是伪代码,而不是完整的实现。我当然可以自己编写代码。)
注意事项:
这需要有合理的表现。 (我的想法是每次访问需要两次 lock/unlock 操作。现在这可能还不够好,但需要很多操作似乎不合理。)
一般情况下,读的次数比较多,但是写比读更重要,对性能更敏感。读者不能饿死作家。
我们被困在一个相当老旧的嵌入式平台(VxWorks 5.5 的专有变体)上,有一个相当老旧的编译器(GCC 4.1.2)和 boost 1.52——除了 boost 的大部分部件依赖在 POSIX 上,因为 POSIX 未在该平台上完全实现。可用的锁定原语基本上是几种信号量(二进制、计数等),我们已经在其上创建了互斥体、条件变量和监视器。
这是IA32,单核。
Concurrent reads of data protected by a mutex are rather common, while writes are rare
这听起来像是 User-space RCU 的理想场景:
URCU is similar to its Linux-kernel counterpart, providing a replacement for reader-writer locking, among other uses. This similarity continues with readers not synchronizing directly with RCU updaters, thus making RCU read-side code paths exceedingly fast, while furthermore permitting RCU readers to make useful forward progress even when running concurrently with RCU updaters—and vice versa.
一如既往,最好的解决方案将取决于细节。 A read-write spin lock may be what you're looking for,但其他方法(如上面建议的读取-复制-更新)可能是一种解决方案 - 尽管在旧的嵌入式平台上,使用的额外内存可能是一个问题。对于罕见的写入,我经常使用任务系统安排工作,这样写入只能在没有从该数据结构读取时发生,但这取决于算法。
Concurrent Control with Readers and Writers; P.J. Courtois, F. Heymans, and D.L. Parnas; MBLE Research Laboratory; Brussels, Belgium 中描述了一种基于信号量和互斥量的算法。
这是基于我的 Boost headers 的简化答案(我将 Boost 称为 批准的方式 )。它只需要条件变量和互斥锁。我使用 Windows 原语重写了它,因为我发现它们具有描述性并且非常简单,但将其视为伪代码。
这是一个非常简单的解决方案,不支持互斥量升级或 try_lock() 操作等操作。如果你愿意,我可以添加这些。我还去掉了一些多余的东西,比如禁用并非绝对必要的中断。
此外,值得一试 boost\thread\pthread\shared_mutex.hpp
(这是基于此)。这是 human-readable.
class SharedMutex {
CRITICAL_SECTION m_state_mutex;
CONDITION_VARIABLE m_shared_cond;
CONDITION_VARIABLE m_exclusive_cond;
size_t shared_count;
bool exclusive;
// This causes write blocks to prevent further read blocks
bool exclusive_wait_blocked;
SharedMutex() : shared_count(0), exclusive(false)
{
InitializeConditionVariable (m_shared_cond);
InitializeConditionVariable (m_exclusive_cond);
InitializeCriticalSection (m_state_mutex);
}
~SharedMutex()
{
DeleteCriticalSection (&m_state_mutex);
DeleteConditionVariable (&m_exclusive_cond);
DeleteConditionVariable (&m_shared_cond);
}
// Write lock
void lock(void)
{
EnterCriticalSection (&m_state_mutex);
while (shared_count > 0 || exclusive)
{
exclusive_waiting_blocked = true;
SleepConditionVariableCS (&m_exclusive_cond, &m_state_mutex, INFINITE)
}
// This thread now 'owns' the mutex
exclusive = true;
LeaveCriticalSection (&m_state_mutex);
}
void unlock(void)
{
EnterCriticalSection (&m_state_mutex);
exclusive = false;
exclusive_waiting_blocked = false;
LeaveCriticalSection (&m_state_mutex);
WakeConditionVariable (&m_exclusive_cond);
WakeAllConditionVariable (&m_shared_cond);
}
// Read lock
void lock_shared(void)
{
EnterCriticalSection (&m_state_mutex);
while (exclusive || exclusive_waiting_blocked)
{
SleepConditionVariableCS (&m_shared_cond, m_state_mutex, INFINITE);
}
++shared_count;
LeaveCriticalSection (&m_state_mutex);
}
void unlock_shared(void)
{
EnterCriticalSection (&m_state_mutex);
--shared_count;
if (shared_count == 0)
{
exclusive_waiting_blocked = false;
LeaveCriticalSection (&m_state_mutex);
WakeConditionVariable (&m_exclusive_cond);
WakeAllConditionVariable (&m_shared_cond);
}
else
{
LeaveCriticalSection (&m_state_mutex);
}
}
};
行为
好的,这个算法的行为有些混乱,下面是它的工作原理。
写入锁定期间 - 读取器和写入器都被阻止。
在写锁结束时 - Reader 线程和一个写入线程将竞赛 以查看哪个开始.
在读取锁定期间 - 写入器被阻止。 Readers 也被阻止 当且仅当 作家被阻止时。
在释放最后一个读锁时 - Reader 线程和一个写入线程将竞速 以查看哪一个开始。
如果在通知期间处理器在 m_exclusive_cond
之前频繁将上下文切换到 m_shared_cond
线程,这 可能会 导致读者饿死作者,但我怀疑这个问题是理论上的而不是实际的,因为它是 Boost 的算法。
您似乎只有互斥量和 condition_variable 作为同步原语。因此,我在这里写了一个 reader-writer lock,它使 readers 饿死。它使用一个互斥量,两个 conditional_variable 和三个整数。
readers - readers in the cv readerQ plus the reading reader
writers - writers in cv writerQ plus the writing writer
active_writers - the writer currently writing. can only be 1 or 0.
它就这样饿死了 readers。如果有几个作家要写,readers 将永远没有机会阅读,直到所有作家都写完为止。这是因为稍后 readers 需要检查 writers
变量。同时,active_writers
变量将保证一次只能有一个写入者写入。
class RWLock {
public:
RWLock()
: shared()
, readerQ(), writerQ()
, active_readers(0), waiting_writers(0), active_writers(0)
{}
void ReadLock() {
std::unique_lock<std::mutex> lk(shared);
while( waiting_writers != 0 )
readerQ.wait(lk);
++active_readers;
lk.unlock();
}
void ReadUnlock() {
std::unique_lock<std::mutex> lk(shared);
--active_readers;
lk.unlock();
writerQ.notify_one();
}
void WriteLock() {
std::unique_lock<std::mutex> lk(shared);
++waiting_writers;
while( active_readers != 0 || active_writers != 0 )
writerQ.wait(lk);
++active_writers;
lk.unlock();
}
void WriteUnlock() {
std::unique_lock<std::mutex> lk(shared);
--waiting_writers;
--active_writers;
if(waiting_writers > 0)
writerQ.notify_one();
else
readerQ.notify_all();
lk.unlock();
}
private:
std::mutex shared;
std::condition_variable readerQ;
std::condition_variable writerQ;
int active_readers;
int waiting_writers;
int active_writers;
};
乍一看我以为我认出了 是亚历山大·捷列霍夫介绍的同一个算法。但经过研究,我认为它是有缺陷的。两个作者可以同时等待 m_exclusive_cond
。当其中一个写入器唤醒并获得独占锁时,它将在 unlock
上设置 exclusive_waiting_blocked = false
,从而将互斥体设置为不一致状态。在那之后,互斥锁可能被清理了。
N2406,首先提出 std::shared_mutex
包含部分实现,下面用更新的语法重复。
class shared_mutex
{
mutex mut_;
condition_variable gate1_;
condition_variable gate2_;
unsigned state_;
static const unsigned write_entered_ = 1U << (sizeof(unsigned)*CHAR_BIT - 1);
static const unsigned n_readers_ = ~write_entered_;
public:
shared_mutex() : state_(0) {}
// Exclusive ownership
void lock();
bool try_lock();
void unlock();
// Shared ownership
void lock_shared();
bool try_lock_shared();
void unlock_shared();
};
// Exclusive ownership
void
shared_mutex::lock()
{
unique_lock<mutex> lk(mut_);
while (state_ & write_entered_)
gate1_.wait(lk);
state_ |= write_entered_;
while (state_ & n_readers_)
gate2_.wait(lk);
}
bool
shared_mutex::try_lock()
{
unique_lock<mutex> lk(mut_, try_to_lock);
if (lk.owns_lock() && state_ == 0)
{
state_ = write_entered_;
return true;
}
return false;
}
void
shared_mutex::unlock()
{
{
lock_guard<mutex> _(mut_);
state_ = 0;
}
gate1_.notify_all();
}
// Shared ownership
void
shared_mutex::lock_shared()
{
unique_lock<mutex> lk(mut_);
while ((state_ & write_entered_) || (state_ & n_readers_) == n_readers_)
gate1_.wait(lk);
unsigned num_readers = (state_ & n_readers_) + 1;
state_ &= ~n_readers_;
state_ |= num_readers;
}
bool
shared_mutex::try_lock_shared()
{
unique_lock<mutex> lk(mut_, try_to_lock);
unsigned num_readers = state_ & n_readers_;
if (lk.owns_lock() && !(state_ & write_entered_) && num_readers != n_readers_)
{
++num_readers;
state_ &= ~n_readers_;
state_ |= num_readers;
return true;
}
return false;
}
void
shared_mutex::unlock_shared()
{
lock_guard<mutex> _(mut_);
unsigned num_readers = (state_ & n_readers_) - 1;
state_ &= ~n_readers_;
state_ |= num_readers;
if (state_ & write_entered_)
{
if (num_readers == 0)
gate2_.notify_one();
}
else
{
if (num_readers == n_readers_ - 1)
gate1_.notify_one();
}
}
该算法源自 Alexander Terekhov 的旧新闻组帖子。它既不饿死 reader 也不饿作家。
有两个"gates"、gate1_
和gate2_
。读者和作者必须通过 gate1_
,并且在尝试这样做时可能会被阻止。一旦 reader 超过 gate1_
,它就读锁定了互斥量。只要拥有所有权的 reader 的数量不超过 gate1_
,并且只要作者没有超过 gate1_
.
,读者就可以通过 gate1_
一次只有一位作家可以通过 gate1_
。即使 reader 拥有所有权,作家也可以通过 gate1_
。但是一旦过了gate1_
,作家仍然没有所有权。它必须先通过 gate2_
。在所有拥有所有权的 reader 都放弃它之前,作者无法通过 gate2_
。回想一下,当作者在 gate2_
等待时,新的 reader 无法通过 gate1_
。当一个作家在 gate2_
等待时,新作家也不能通过 gate1_
。
readers 和 writers 都在 gate1_
处被阻止,并且通过(几乎)相同的要求来通过它,这一特征使得该算法对 reader 都公平s 和作家,两者都不饿。
互斥量 "state" 有意保留在一个单词中,以表明部分使用原子(作为优化)用于某些状态更改是可能的(即对于无竞争的 "fast path").但是,此处未演示该优化。一个例子是,如果编写器线程可以自动将 state_
从 0 更改为 write_entered
,则他无需阻塞甚至 lock/unlock mut_
即可获得锁。 unlock()
可以用原子存储来实现。等等。这些优化在这里没有显示,因为它们比这个简单的描述听起来更难正确实施。
您可以使用一些好方法来提供帮助。
第一,表现不错。 VxWorks 以其出色的上下文切换时间而著称。无论您使用哪种锁定解决方案,它都可能涉及信号量。我不会害怕为此使用信号量(复数),它们在 VxWorks 中得到了很好的优化,快速的上下文切换时间有助于减少评估许多信号量状态等导致的性能下降。
另外我会忘记使用 POSIX 信号量,它们将简单地叠加在 VxWork 自己的信号量之上。 VxWorks 提供本机计数、二进制和互斥信号量;使用适合的那个会使它更快一点。二进制文件有时非常有用;多次发帖,从未超过1次。
其次,写比读更重要。当我在 VxWorks 中有这种需求并且一直使用信号量来控制访问时,我使用任务优先级来指示哪个任务更重要并且应该首先访问资源。这很好用;从字面上看,VxWorks 中的所有内容都是一个任务(嗯,线程),就像任何其他任务一样,包括所有设备驱动程序等。
VxWorks 还解决了优先级反转(Linus Torvalds 讨厌的事情)。因此,如果您使用信号量实现锁定,则可以依靠 OS 调度程序来调高优先级较低的 readers,如果它们阻塞了较高优先级的编写器。它可以导致更简单的代码,并且您也可以充分利用 OS。
因此一个潜在的解决方案 是使用一个 VxWorks 计数信号量保护资源,初始化为等于 reader 数量的值。每次 reader 想要读取时,它都会获取信号量(将计数减 1。每次读取完成后,它都会发布信号量,将计数增加 1。每次编写器想要写入时,它都会获取semaphore n (n = number of readers) 次,完成后发送 n 次。最后使 writer 任务的优先级高于任何 readers,并依赖 OS 快速上下文切换时间和优先级反转。
请记住,您是在硬实时 OS 而不是 Linux 上编程。采用/发布原生 VxWorks 信号量并不涉及与 Linux 上的类似行为相同数量的运行时间,尽管现在 Linux 也相当不错(我现在使用 PREEMPT_RT ). VxWorks 调度程序和所有设备驱动程序都可以依赖于其运行。如果你愿意,你甚至可以让你的写入任务成为整个系统中最高的优先级,甚至高于所有的设备驱动程序!
为了帮助解决问题,还请考虑您的每个线程正在做什么。 VxWorks 允许您指示任务 is/isn 不使用 FPU。如果您使用本机 VxWorks TaskSpawn 例程而不是 pthread_create 那么您将有机会指定它。这意味着如果您的 thread/task 没有进行任何浮点数学运算,并且您在调用 TaskSpawn 时已经这样说过,则上下文切换时间会更快,因为调度程序不会费心保留/恢复 FPU 状态。
这很有可能成为您正在开发的平台上的最佳解决方案。它发挥了 OS 的优势(快速信号量、快速上下文切换时间),而无需引入额外代码负载来重新创建其他平台上常见的替代(可能更优雅)解决方案。
第三,坚持使用旧的 GCC 和旧的 Boost。基本上,除了关于打电话给 WindRiver 和讨论购买升级的低价值建议外,我无能为力。就我个人而言,当我为 VxWorks 编程时,我使用的是 VxWork 的原生 API 而不是 POSIX。好吧,代码的可移植性不是很好,但最终还是很快的; POSIX 只是在本机 API 之上的一层,它总是会减慢速度。
也就是说,POSIX 计数和互斥信号量与 VxWork 的本机计数和互斥信号量非常相似。这可能意味着 POSIX 分层不是很厚。
关于 VxWorks 编程的一般注意事项
调试 我一直试图使用可用于 Solaris 的开发工具 (Tornado)。这是迄今为止我遇到过的最好的多线程调试环境。为什么?它允许您启动多个调试会话,一个对应系统中的每个 thread/task。您最终得到每个线程的调试 window,并且您正在单独且独立地调试每个线程。越过阻塞操作,调试 window 被阻塞。将鼠标焦点移动到另一个调试 window,跳过将释放块的操作并观察第一个 window 完成它的步骤。
您最终会进行大量调试 windows,但这是迄今为止调试多线程内容的最佳方式。它使编写非常复杂的东西和发现问题变得非常容易。您可以轻松探索应用程序中的不同动态交互,因为您可以简单而强大地控制每个线程随时执行的操作。
具有讽刺意味的是,Tornado 的 Windows 版本不允许您这样做;每个系统一个悲惨的单一调试 windows,就像任何其他无聊的旧 IDE,例如 Visual Studio 等。我什至从未见过现代的 IDE 接近任何地方在多线程调试方面与 Solaris 上的 Tornado 一样好。
HardDrives 如果您的 readers 和作家正在使用磁盘上的文件,请考虑 VxWorks 5.5 已经很老了。不会支持像 NCQ 这样的东西。在这种情况下,我提出的解决方案(上面概述)可能最好使用单个互斥信号量来阻止多个 reader 在读取磁盘的不同部分时相互绊倒。这取决于你的 reader 到底在做什么,但是如果他们从文件中读取连续的数据,这将避免在磁盘表面来回摆动 read/write 头(非常慢)。
在我的案例中,我使用了这个技巧来调整网络接口上的流量;每个任务都发送不同类型的数据,任务优先级反映了数据在网络上的优先级。这非常优雅,没有消息被碎片化,但重要的消息获得了可用带宽的最大份额。
现在微软已经开放了.NET源代码,你可以看看他们的ReaderWRiterLockSlim实现。
我不确定他们使用的更基本的原语是否对您可用,其中一些也是 .NET 库的一部分,并且他们的代码也可用。
Microsoft 在改进其锁定机制的性能方面花费了大量时间,因此这是一个很好的起点。
我们发现我们的代码中有几个地方,受互斥锁保护的数据的并发读取相当普遍,而写入则很少见。我们的测量结果似乎表明,使用简单的互斥量会严重阻碍读取该数据的代码的性能。所以我们需要的是 multiple-read/single-write 互斥锁。我知道这可以建立在更简单的原语之上,但在我自己尝试之前,我宁愿询问现有的知识:
构建 multiple-read/single-write 锁定更简单的同步原语的批准方法是什么?
我确实知道如何制作它,但我宁愿得到不偏不倚的答案(可能是错误的)。 (注意:我期望的是如何做的解释,可能是伪代码,而不是完整的实现。我当然可以自己编写代码。)
注意事项:
这需要有合理的表现。 (我的想法是每次访问需要两次 lock/unlock 操作。现在这可能还不够好,但需要很多操作似乎不合理。)
一般情况下,读的次数比较多,但是写比读更重要,对性能更敏感。读者不能饿死作家。
我们被困在一个相当老旧的嵌入式平台(VxWorks 5.5 的专有变体)上,有一个相当老旧的编译器(GCC 4.1.2)和 boost 1.52——除了 boost 的大部分部件依赖在 POSIX 上,因为 POSIX 未在该平台上完全实现。可用的锁定原语基本上是几种信号量(二进制、计数等),我们已经在其上创建了互斥体、条件变量和监视器。
这是IA32,单核。
Concurrent reads of data protected by a mutex are rather common, while writes are rare
这听起来像是 User-space RCU 的理想场景:
URCU is similar to its Linux-kernel counterpart, providing a replacement for reader-writer locking, among other uses. This similarity continues with readers not synchronizing directly with RCU updaters, thus making RCU read-side code paths exceedingly fast, while furthermore permitting RCU readers to make useful forward progress even when running concurrently with RCU updaters—and vice versa.
一如既往,最好的解决方案将取决于细节。 A read-write spin lock may be what you're looking for,但其他方法(如上面建议的读取-复制-更新)可能是一种解决方案 - 尽管在旧的嵌入式平台上,使用的额外内存可能是一个问题。对于罕见的写入,我经常使用任务系统安排工作,这样写入只能在没有从该数据结构读取时发生,但这取决于算法。
Concurrent Control with Readers and Writers; P.J. Courtois, F. Heymans, and D.L. Parnas; MBLE Research Laboratory; Brussels, Belgium 中描述了一种基于信号量和互斥量的算法。
这是基于我的 Boost headers 的简化答案(我将 Boost 称为 批准的方式 )。它只需要条件变量和互斥锁。我使用 Windows 原语重写了它,因为我发现它们具有描述性并且非常简单,但将其视为伪代码。
这是一个非常简单的解决方案,不支持互斥量升级或 try_lock() 操作等操作。如果你愿意,我可以添加这些。我还去掉了一些多余的东西,比如禁用并非绝对必要的中断。
此外,值得一试 boost\thread\pthread\shared_mutex.hpp
(这是基于此)。这是 human-readable.
class SharedMutex {
CRITICAL_SECTION m_state_mutex;
CONDITION_VARIABLE m_shared_cond;
CONDITION_VARIABLE m_exclusive_cond;
size_t shared_count;
bool exclusive;
// This causes write blocks to prevent further read blocks
bool exclusive_wait_blocked;
SharedMutex() : shared_count(0), exclusive(false)
{
InitializeConditionVariable (m_shared_cond);
InitializeConditionVariable (m_exclusive_cond);
InitializeCriticalSection (m_state_mutex);
}
~SharedMutex()
{
DeleteCriticalSection (&m_state_mutex);
DeleteConditionVariable (&m_exclusive_cond);
DeleteConditionVariable (&m_shared_cond);
}
// Write lock
void lock(void)
{
EnterCriticalSection (&m_state_mutex);
while (shared_count > 0 || exclusive)
{
exclusive_waiting_blocked = true;
SleepConditionVariableCS (&m_exclusive_cond, &m_state_mutex, INFINITE)
}
// This thread now 'owns' the mutex
exclusive = true;
LeaveCriticalSection (&m_state_mutex);
}
void unlock(void)
{
EnterCriticalSection (&m_state_mutex);
exclusive = false;
exclusive_waiting_blocked = false;
LeaveCriticalSection (&m_state_mutex);
WakeConditionVariable (&m_exclusive_cond);
WakeAllConditionVariable (&m_shared_cond);
}
// Read lock
void lock_shared(void)
{
EnterCriticalSection (&m_state_mutex);
while (exclusive || exclusive_waiting_blocked)
{
SleepConditionVariableCS (&m_shared_cond, m_state_mutex, INFINITE);
}
++shared_count;
LeaveCriticalSection (&m_state_mutex);
}
void unlock_shared(void)
{
EnterCriticalSection (&m_state_mutex);
--shared_count;
if (shared_count == 0)
{
exclusive_waiting_blocked = false;
LeaveCriticalSection (&m_state_mutex);
WakeConditionVariable (&m_exclusive_cond);
WakeAllConditionVariable (&m_shared_cond);
}
else
{
LeaveCriticalSection (&m_state_mutex);
}
}
};
行为
好的,这个算法的行为有些混乱,下面是它的工作原理。
写入锁定期间 - 读取器和写入器都被阻止。
在写锁结束时 - Reader 线程和一个写入线程将竞赛 以查看哪个开始.
在读取锁定期间 - 写入器被阻止。 Readers 也被阻止 当且仅当 作家被阻止时。
在释放最后一个读锁时 - Reader 线程和一个写入线程将竞速 以查看哪一个开始。
如果在通知期间处理器在 m_exclusive_cond
之前频繁将上下文切换到 m_shared_cond
线程,这 可能会 导致读者饿死作者,但我怀疑这个问题是理论上的而不是实际的,因为它是 Boost 的算法。
您似乎只有互斥量和 condition_variable 作为同步原语。因此,我在这里写了一个 reader-writer lock,它使 readers 饿死。它使用一个互斥量,两个 conditional_variable 和三个整数。
readers - readers in the cv readerQ plus the reading reader
writers - writers in cv writerQ plus the writing writer
active_writers - the writer currently writing. can only be 1 or 0.
它就这样饿死了 readers。如果有几个作家要写,readers 将永远没有机会阅读,直到所有作家都写完为止。这是因为稍后 readers 需要检查 writers
变量。同时,active_writers
变量将保证一次只能有一个写入者写入。
class RWLock {
public:
RWLock()
: shared()
, readerQ(), writerQ()
, active_readers(0), waiting_writers(0), active_writers(0)
{}
void ReadLock() {
std::unique_lock<std::mutex> lk(shared);
while( waiting_writers != 0 )
readerQ.wait(lk);
++active_readers;
lk.unlock();
}
void ReadUnlock() {
std::unique_lock<std::mutex> lk(shared);
--active_readers;
lk.unlock();
writerQ.notify_one();
}
void WriteLock() {
std::unique_lock<std::mutex> lk(shared);
++waiting_writers;
while( active_readers != 0 || active_writers != 0 )
writerQ.wait(lk);
++active_writers;
lk.unlock();
}
void WriteUnlock() {
std::unique_lock<std::mutex> lk(shared);
--waiting_writers;
--active_writers;
if(waiting_writers > 0)
writerQ.notify_one();
else
readerQ.notify_all();
lk.unlock();
}
private:
std::mutex shared;
std::condition_variable readerQ;
std::condition_variable writerQ;
int active_readers;
int waiting_writers;
int active_writers;
};
乍一看我以为我认出了 m_exclusive_cond
。当其中一个写入器唤醒并获得独占锁时,它将在 unlock
上设置 exclusive_waiting_blocked = false
,从而将互斥体设置为不一致状态。在那之后,互斥锁可能被清理了。
N2406,首先提出 std::shared_mutex
包含部分实现,下面用更新的语法重复。
class shared_mutex
{
mutex mut_;
condition_variable gate1_;
condition_variable gate2_;
unsigned state_;
static const unsigned write_entered_ = 1U << (sizeof(unsigned)*CHAR_BIT - 1);
static const unsigned n_readers_ = ~write_entered_;
public:
shared_mutex() : state_(0) {}
// Exclusive ownership
void lock();
bool try_lock();
void unlock();
// Shared ownership
void lock_shared();
bool try_lock_shared();
void unlock_shared();
};
// Exclusive ownership
void
shared_mutex::lock()
{
unique_lock<mutex> lk(mut_);
while (state_ & write_entered_)
gate1_.wait(lk);
state_ |= write_entered_;
while (state_ & n_readers_)
gate2_.wait(lk);
}
bool
shared_mutex::try_lock()
{
unique_lock<mutex> lk(mut_, try_to_lock);
if (lk.owns_lock() && state_ == 0)
{
state_ = write_entered_;
return true;
}
return false;
}
void
shared_mutex::unlock()
{
{
lock_guard<mutex> _(mut_);
state_ = 0;
}
gate1_.notify_all();
}
// Shared ownership
void
shared_mutex::lock_shared()
{
unique_lock<mutex> lk(mut_);
while ((state_ & write_entered_) || (state_ & n_readers_) == n_readers_)
gate1_.wait(lk);
unsigned num_readers = (state_ & n_readers_) + 1;
state_ &= ~n_readers_;
state_ |= num_readers;
}
bool
shared_mutex::try_lock_shared()
{
unique_lock<mutex> lk(mut_, try_to_lock);
unsigned num_readers = state_ & n_readers_;
if (lk.owns_lock() && !(state_ & write_entered_) && num_readers != n_readers_)
{
++num_readers;
state_ &= ~n_readers_;
state_ |= num_readers;
return true;
}
return false;
}
void
shared_mutex::unlock_shared()
{
lock_guard<mutex> _(mut_);
unsigned num_readers = (state_ & n_readers_) - 1;
state_ &= ~n_readers_;
state_ |= num_readers;
if (state_ & write_entered_)
{
if (num_readers == 0)
gate2_.notify_one();
}
else
{
if (num_readers == n_readers_ - 1)
gate1_.notify_one();
}
}
该算法源自 Alexander Terekhov 的旧新闻组帖子。它既不饿死 reader 也不饿作家。
有两个"gates"、gate1_
和gate2_
。读者和作者必须通过 gate1_
,并且在尝试这样做时可能会被阻止。一旦 reader 超过 gate1_
,它就读锁定了互斥量。只要拥有所有权的 reader 的数量不超过 gate1_
,并且只要作者没有超过 gate1_
.
gate1_
一次只有一位作家可以通过 gate1_
。即使 reader 拥有所有权,作家也可以通过 gate1_
。但是一旦过了gate1_
,作家仍然没有所有权。它必须先通过 gate2_
。在所有拥有所有权的 reader 都放弃它之前,作者无法通过 gate2_
。回想一下,当作者在 gate2_
等待时,新的 reader 无法通过 gate1_
。当一个作家在 gate2_
等待时,新作家也不能通过 gate1_
。
readers 和 writers 都在 gate1_
处被阻止,并且通过(几乎)相同的要求来通过它,这一特征使得该算法对 reader 都公平s 和作家,两者都不饿。
互斥量 "state" 有意保留在一个单词中,以表明部分使用原子(作为优化)用于某些状态更改是可能的(即对于无竞争的 "fast path").但是,此处未演示该优化。一个例子是,如果编写器线程可以自动将 state_
从 0 更改为 write_entered
,则他无需阻塞甚至 lock/unlock mut_
即可获得锁。 unlock()
可以用原子存储来实现。等等。这些优化在这里没有显示,因为它们比这个简单的描述听起来更难正确实施。
您可以使用一些好方法来提供帮助。
第一,表现不错。 VxWorks 以其出色的上下文切换时间而著称。无论您使用哪种锁定解决方案,它都可能涉及信号量。我不会害怕为此使用信号量(复数),它们在 VxWorks 中得到了很好的优化,快速的上下文切换时间有助于减少评估许多信号量状态等导致的性能下降。
另外我会忘记使用 POSIX 信号量,它们将简单地叠加在 VxWork 自己的信号量之上。 VxWorks 提供本机计数、二进制和互斥信号量;使用适合的那个会使它更快一点。二进制文件有时非常有用;多次发帖,从未超过1次。
其次,写比读更重要。当我在 VxWorks 中有这种需求并且一直使用信号量来控制访问时,我使用任务优先级来指示哪个任务更重要并且应该首先访问资源。这很好用;从字面上看,VxWorks 中的所有内容都是一个任务(嗯,线程),就像任何其他任务一样,包括所有设备驱动程序等。
VxWorks 还解决了优先级反转(Linus Torvalds 讨厌的事情)。因此,如果您使用信号量实现锁定,则可以依靠 OS 调度程序来调高优先级较低的 readers,如果它们阻塞了较高优先级的编写器。它可以导致更简单的代码,并且您也可以充分利用 OS。
因此一个潜在的解决方案 是使用一个 VxWorks 计数信号量保护资源,初始化为等于 reader 数量的值。每次 reader 想要读取时,它都会获取信号量(将计数减 1。每次读取完成后,它都会发布信号量,将计数增加 1。每次编写器想要写入时,它都会获取semaphore n (n = number of readers) 次,完成后发送 n 次。最后使 writer 任务的优先级高于任何 readers,并依赖 OS 快速上下文切换时间和优先级反转。
请记住,您是在硬实时 OS 而不是 Linux 上编程。采用/发布原生 VxWorks 信号量并不涉及与 Linux 上的类似行为相同数量的运行时间,尽管现在 Linux 也相当不错(我现在使用 PREEMPT_RT ). VxWorks 调度程序和所有设备驱动程序都可以依赖于其运行。如果你愿意,你甚至可以让你的写入任务成为整个系统中最高的优先级,甚至高于所有的设备驱动程序!
为了帮助解决问题,还请考虑您的每个线程正在做什么。 VxWorks 允许您指示任务 is/isn 不使用 FPU。如果您使用本机 VxWorks TaskSpawn 例程而不是 pthread_create 那么您将有机会指定它。这意味着如果您的 thread/task 没有进行任何浮点数学运算,并且您在调用 TaskSpawn 时已经这样说过,则上下文切换时间会更快,因为调度程序不会费心保留/恢复 FPU 状态。
这很有可能成为您正在开发的平台上的最佳解决方案。它发挥了 OS 的优势(快速信号量、快速上下文切换时间),而无需引入额外代码负载来重新创建其他平台上常见的替代(可能更优雅)解决方案。
第三,坚持使用旧的 GCC 和旧的 Boost。基本上,除了关于打电话给 WindRiver 和讨论购买升级的低价值建议外,我无能为力。就我个人而言,当我为 VxWorks 编程时,我使用的是 VxWork 的原生 API 而不是 POSIX。好吧,代码的可移植性不是很好,但最终还是很快的; POSIX 只是在本机 API 之上的一层,它总是会减慢速度。
也就是说,POSIX 计数和互斥信号量与 VxWork 的本机计数和互斥信号量非常相似。这可能意味着 POSIX 分层不是很厚。
关于 VxWorks 编程的一般注意事项
调试 我一直试图使用可用于 Solaris 的开发工具 (Tornado)。这是迄今为止我遇到过的最好的多线程调试环境。为什么?它允许您启动多个调试会话,一个对应系统中的每个 thread/task。您最终得到每个线程的调试 window,并且您正在单独且独立地调试每个线程。越过阻塞操作,调试 window 被阻塞。将鼠标焦点移动到另一个调试 window,跳过将释放块的操作并观察第一个 window 完成它的步骤。
您最终会进行大量调试 windows,但这是迄今为止调试多线程内容的最佳方式。它使编写非常复杂的东西和发现问题变得非常容易。您可以轻松探索应用程序中的不同动态交互,因为您可以简单而强大地控制每个线程随时执行的操作。
具有讽刺意味的是,Tornado 的 Windows 版本不允许您这样做;每个系统一个悲惨的单一调试 windows,就像任何其他无聊的旧 IDE,例如 Visual Studio 等。我什至从未见过现代的 IDE 接近任何地方在多线程调试方面与 Solaris 上的 Tornado 一样好。
HardDrives 如果您的 readers 和作家正在使用磁盘上的文件,请考虑 VxWorks 5.5 已经很老了。不会支持像 NCQ 这样的东西。在这种情况下,我提出的解决方案(上面概述)可能最好使用单个互斥信号量来阻止多个 reader 在读取磁盘的不同部分时相互绊倒。这取决于你的 reader 到底在做什么,但是如果他们从文件中读取连续的数据,这将避免在磁盘表面来回摆动 read/write 头(非常慢)。
在我的案例中,我使用了这个技巧来调整网络接口上的流量;每个任务都发送不同类型的数据,任务优先级反映了数据在网络上的优先级。这非常优雅,没有消息被碎片化,但重要的消息获得了可用带宽的最大份额。
现在微软已经开放了.NET源代码,你可以看看他们的ReaderWRiterLockSlim实现。
我不确定他们使用的更基本的原语是否对您可用,其中一些也是 .NET 库的一部分,并且他们的代码也可用。
Microsoft 在改进其锁定机制的性能方面花费了大量时间,因此这是一个很好的起点。