c ++ 11:具有互斥锁的线程看到原子变量的值发生变化,尽管这是唯一可以更改它的代码

c++11: thread with mutex sees atomic variable's value changing despite this being the only code that can change it

正在更新一个原子变量(在本例中为 128 位结构),令唯一能够更新它的线程感到惊讶。怎么会?

这是一个最小的例子,所以它没有做任何有意义的事情,但是:一个 alloc() 函数 return 一个 malloc 缓冲区 100 次,然后分配一个新的缓冲区,它将 return100次,以此类推,即使面对被多线程调用。

我有一个原子变量,它是一个带有指针的结构、一个 32 位 int 和另一个旨在避免 ABA 问题的 32 位计数器。

我有一个包含两个部分的函数。第一部分将 如果 return 计数非零 ,CAS 结构以减少 return 计数(并增加 ABA 计数器),然后 return指针。否则,第二部分获得互斥体,为新指针分配内存,CAS 将小结构完全包含新指针、新的非零 return 计数器,以及 ABA 计数器的增量。

简而言之,当计数器大于零时,每个线程都可以更新这个结构。但是一旦它为零,第一个获取互斥量的线程将 我认为 是唯一可以再次 CAS 更新此结构的线程。

除了有时这个 CAS 会失败! "How can it fail"是我的问题。

这是一个 运行 示例。它可以用 g++ lockchange.cxx -o lockchange -latomic -pthread 编译。它在 Fedora 31 上的 gcc version 9.2.1 20190827 (Red Hat 9.2.1-1) (GCC) 上运行。

#include <algorithm>
#include <atomic>
#include <chrono>
#include <cassert>
#include <cstring>
#include <mutex>
#include <thread>
#include <vector>

using namespace std;


struct MyPair { /* Hungarian: pair */

    char*    pc;         /* a buffer to be used n times */
    int32_t  iRemaining; /* number of times left to use pc */
    uint32_t iUpdates;   /* to avoid ABA problem */
};



const int iThreads{ 200 };
const int iThreadIterations{ 1000000 };
const int iSizeItem{ 128 };

mutex mux;

atomic<MyPair> pairNext;



char* alloc() {

 TRY_AGAIN:
  MyPair pairCur = pairNext.load();

  // CASE 1: We can use the existing buffer?

  while ( pairCur.iRemaining ) {
      char* pcRV = pairCur.pc;

      MyPair pairNew = { pairCur.pc,
                         pairCur.iRemaining - 1,
                         pairCur.iUpdates + 1 };

      if ( pairNext.compare_exchange_weak( pairCur, pairNew ) )
          return pcRV;

      // Otherwise, pairNext was changed out from under us and pairCur
      // will have been updated.  Try again, as long as iRemaining
      // non-zero.
  }



  // CASE 2: We've used pc as many times as allowed, so allocate a new pc.

  // Get a mutex as we'll be changing too many fields to do atomically.
  lock_guard<mutex> guard( mux );

  // If multiple threads saw iRemaining = 0, they all will
  // have tried for the mutex; only one will have gotten it, so
  // there's a good chance that by the time we get the mutex, a
  // sibling thread will have allocated a new pc and placed it at
  // pairNext, so we don't need to allocate after all.

  if ( pairNext.load().iRemaining ) // <=============================== it's as if this line isn't seeing the update made by the line below in real time.
      goto TRY_AGAIN;

  // Get a new buffer.
  char* pcNew = (char*) malloc( iSizeItem );

  MyPair pairNew = { pcNew, 100, pairCur.iUpdates + 1 };

  if ( pairNext.compare_exchange_strong( pairCur, pairNew ) ) { //<===== the update that's not being seen above in real time
      // *** other stuff with pcNew that needs mutex protection ***;
      return pcNew;

  } else {

      // CASE 2c: after allocating a new page, we find that
      // another thread has beaten us to it.  I CAN'T FIGURE OUT
      // HOW THAT'S POSSIBLE THOUGH.  Our response should be safe
      // enough: put our allocation back, and start all over again
      // because who knows what else we missed.  I see this error
      // like 813 times out of 40 BILLION allocations in the
      // hammer test, ranging from 1 to 200 threads.

      printf( "unexpected: had lock but pairNext changed when iRemaining=0\n" );
      // In fact the following free and goto should and seem to
      // recover fine, but to be clear my question is how we can
      // possibly end up here in the first place.
      abort();
      free( pcNew );
      goto TRY_AGAIN;
  }
}



void Test( int iThreadNumber ) {

  for ( int i = 0; i < iThreadIterations; i++ )
      alloc();
}



int main( int nArg, char* apszArg[] ) {

  vector<thread> athr;

  for ( int i = 0; i < iThreads; i++ )
      athr.emplace_back( Test, i );

  for ( auto& thr: athr )
      thr.join();
}

这个问题被称为 "ABA Problem,",我可以将其总结为检查无锁多线程编码中的变量并认为它没有改变,但它已经改变了。

这里,iRemaining是一个计数器,设置为100,然后重复倒数到0。

互斥量被锁定后,一个"optimization check"(不需要确保正确性,只是为了避免分配新缓冲区和重置iRemaining等的开销,如果另一个线程已经完成所以)天真地检查 iRemaining == 0 以确定结构 pairCur 在获取锁期间没有改变(这可能确实需要等待很长时间)。

实际上发生的是当线程 A 等待获取锁时,很少,但是考虑到数十亿次试验,相当多的次数iRemaining 正在递减 100 倍的精确倍数。通过让代码 运行 abort() 然后查看变量,我看到 pairNext 的值是 { pc = XXX, iRemaining = 0, iUpdates = 23700 }pairNew{ pc = YYY, iRemaining = 100, iUpdates = 23600 } . iUpdates 现在比我们想象的高了 100! 换句话说,在我们等待锁定的时候又进行了 100 次更新,这就是要转的确切数字 iRemaining 再次为 0。这也意味着 pc 与以前不同,

该结构已经有一个"update counter" iUpdates,这是避免ABA问题的标准解决方案。如果我们不检查 iRemaining == 0 而检查 iUpdates 是否与我们的预锁定原子快照相同,那么优化启发式将变得 100% 有效并且我们永远不会遇到意外的 printf()abort()。 (好吧,它可能仍然会发生,但现在需要一个线程被阻塞以进行 2^32 次操作的精确倍数,而不是仅仅 100 次操作,而且这可能每年、十年或一个世纪只发生一次,如果可能的话这种架构。)这是改进后的代码:

  if ( pairNext.load().iUpdates != pairCur.iUpdates ) // <=============================== it's as if this line isn't seeing the update made by the line below in real time.

请注意 goto TRY_AGAIN; 解锁了互斥锁,因为您要跳回 lock_guard<mutex> 构造之前。通常人们将 {} 放在顶部带有锁定的范围内,以明确这一点(并控制何时解锁)。我没有检查 ISO C++ 规则来查看这是否是必需的行为,但至少 G++ 和 clang++ 实现它的方式 goto 确实解锁了。 (将 RAII 锁定与 goto 混合使用似乎是糟糕的设计)。

另请注意,您确实在持有互斥锁时重新加载 pairNext 一次,但丢弃该值并保留 pairCur 作为 CAS 尝试的 "expected" 值.

要达到临界区内的 CAS,pairNext.iRemaining 必须是

  • 仍然 零(例如,该线程赢得了获取锁的竞争)。您假设 CAS 成功的情况是因为 pairNext == pairCur.
  • 或零 再次 在另一个线程或多个线程将 iRemaining 设置为 100 并在该线程处于睡眠状态时将其一直递减到零。如果线程多于内核,这很容易发生。但是,即使有很多内核,它总是可能的:中断可以暂时阻塞线程,或者当它发现互斥锁时它的退避策略可能导致它在计数器再次为零之前不重试。

我添加了新的调试代码,使这一点变得清晰:

 lock_guard<mutex> guard( mux );    // existing code

 if ( pairNext.load().iRemaining )
      goto TRY_AGAIN;

  // new debugging code
  MyPair tmp = pairNext.load();
  if (memcmp(&tmp, &pairCur, sizeof(tmp)) != 0)
          printf("pairNext changed between retry loop and taking the mutex\n"
                "cur  = %p, %d, %u\n"
                "next = %p, %d, %u\n",
                pairCur.pc, pairCur.iRemaining, pairCur.iUpdates,
                tmp.pc, tmp.iRemaining, tmp.iUpdates);
$ clang++ -g -O2 lc.cpp -o lockchange -latomic -pthread && ./lockchange 
pairNext changed between retry loop and taking the mutex
cur  = 0x7f594c000e30, 0, 808
next =  0x7f5940000b60, 0, 909
unexpected: had lock but pairNext changed when iRemaining=0
Aborted (core dumped)

解决这个问题:

由于您正在重新加载 pairNext 并持有互斥锁,因此只需将该值用作 CAS 的 "expected"。遗憾的是,编译器不会优化 foo.load().member 以仅加载该成员:它们仍然会在 x86-64 或其他 ISA 上使用 lock cmpxchg16b 加载整个 16 字节对象。所以无论如何你都要支付全部费用。

  lock_guard<mutex> guard( mux );

  pairCur = pairNext.load();   // may have been changed by other threads
  if ( pairCur.iRemaining )
      goto TRY_AGAIN;

  // then same as before, use it for CAS
  // no other thread can be in the critical section, 
  // and the code outside won't do anything while pairNext.iRemaining == 0

无论如何,16 字节原子加载的成本与 CAS 相同,但失败路径必须释放 malloc 缓冲区或自旋直到 CAS 在离开临界区之前成功。如果你能避免浪费太多 CPU 时间并引起争用,后者实际上可以工作,例如_mm_pause().