我不明白在C++11中如何实现乐观并发

I don't understand how can optimistic concurrency be implemented in C++11

我试图在 C++11 中实现一个不使用锁的受保护变量。我读过一些关于乐观并发的内容,但我不明白它怎么能在 C++ 和任何语言中实现。

我尝试实现乐观并发的方法是使用 'last modification id'。我正在做的过程是:

我看到的问题是,比较 'last modification ids'(本地副本和当前副本)和提交之前更改,无法确保没有其他线程修改受保护变量的值

下面是一个代码示例。让我们假设有许多线程执行该代码并共享变量 var.

/**
 * This struct is pretended to implement a protected variable,
 * but using optimistic concurrency instead of locks.
 */
struct ProtectedVariable final {

   ProtectedVariable() : var(0), lastModificationId(0){ }

   int getValue() const {
      return var.load();
   }

   void setValue(int val) {
      // This method is not atomic, other thread could change the value
      // of val before being able to increment the 'last modification id'.
      var.store(val);
      lastModificationId.store(lastModificationId.load() + 1);
   }

   size_t getLastModificationId() const {
      return lastModificationId.load();
   }

private:
   std::atomic<int> var;
   std::atomic<size_t> lastModificationId;
};



ProtectedVariable var;


/**
 * Suppose this method writes a value in some sort of database.
 */
int commitChanges(int val){
   // Now, if nobody has changed the value of 'var', commit its value,
   // retry the transaction otherwise.
   if(var.getLastModificationId() == currModifId) {

      // Here is one of the problems. After comparing the value of both Ids, other
      // thread could modify the value of 'var', hence I would be
      // performing the commit with a corrupted value.
      var.setValue(val);

      // Again, the same problem as above.
      writeToDatabase(val);

      // Return 'ok' in case of everything has gone ok.
      return 0;
   } else {
      // If someone has changed the value of var while trying to 
      // calculating and commiting it, return error;
      return -1;
   }
}

/**
 * This method is pretended to be atomic, but without using locks.
 */
void modifyVar(){
   // Get the modification id for checking whether or not some
   // thread has modified the value of 'var' after commiting it.
   size_t currModifId = lastModificationId.load();

   // Get a local copy of 'var'.
   int currVal = var.getValue();

   // Perform some operations basing on the current value of
   // 'var'.
   int newVal = currVal + 1 * 2 / 3;

   if(commitChanges(newVal) != 0){
      // If someone has changed the value of var while trying to 
      // calculating and commiting it, retry the transaction.
      modifyVar();
   }
}

我知道上面的代码有错误,但我不明白如何以正确的方式实现上面的代码,没有错误。

如果我理解你的问题,你的意思是确保 varlastModificationId 要么都改变了,要么都不改变。

为什么不使用 std::atomic<T>,其中 T 是包含 intsize_t 的结构?

struct VarWithModificationId {
  int var;
  size_t lastModificationId;
};

class ProtectedVariable {
  private std::atomic<VarWithModificationId> protectedVar;

  // Add your public setter/getter methods here
  // You should be guaranteed that if two threads access protectedVar, they'll each get a 'consistent' view of that variable, but the setter will need to use a lock
};

这里的关键是acquire-release语义和test-and-increment。 Acquire-release 语义是您执行操作顺序的方式。 Test-and-increment 是您在比赛中选择哪个线程获胜的方式。

因此您的问题是.store(lastModificationId+1)。您需要 .fetch_add(1)。它 returns 旧值。如果这不是预期值(从 before 你的阅读),那么你输了比赛并重试。

乐观并发并不意味着你不用锁,它仅仅意味着你在大部分操作期间不保留锁。

我们的想法是将您的修改分成三个部分:

  1. 初始化,比如获取lastModificationId。这部分可能需要加锁,但不一定。
  2. 实际计算。所有昂贵或阻塞的代码都放在这里(包括任何磁盘写入或网络代码)。结果的编写方式不会掩盖以前的版本。它的可能工作方式是将新值存储在旧值旁边,按 not-yet-commited 版本索引。
  3. 原子提交。这部分是有锁的,一定要短、简单、不阻塞。它可能的工作方式是它只是增加版本号——在确认之后,没有其他版本同时提交。此阶段没有数据库写入。

这里的主要假设是计算部分比提交部分昂贵得多。如果你的修改是微不足道的并且计算成本低,那么你可以直接使用锁,这样就简单多了。

分为这 3 个部分的一些示例代码可能如下所示:

struct Data {
  ...
}

...

std::mutex lock;
volatile const Data* value;  // The protected data
volatile int current_value_version = 0;

...

bool modifyProtectedValue() {
  // Initialize.
  int version_on_entry = current_value_version;

  // Compute the new value, using the current value.
  // We don't have any lock here, so it's fine to make heavy
  // computations or block on I/O.
  Data* new_value = new Data;
  compute_new_value(value, new_value);

  // Commit or fail.
  bool success;
  lock.lock();
  if (current_value_version == version_on_entry) {
    value = new_value;
    current_value_version++;
    success = true;
  } else {
    success = false;
  }
  lock.unlock();
  
  // Roll back in case of failure.
  if (!success) {
    delete new_value;
  }

  // Inform caller about success or failure.
  return success;
}

// It's cleaner to keep retry logic separately.
bool retryModification(int retries = 5) {
  for (int i = 0; i < retries; ++i) {
    if (modifyProtectedValue()) {
      return true;
    }
  }
  return false;
}

这是一个非常基本的方法,尤其是回滚是微不足道的。在现实世界的例子中 re-creating 整个数据对象(或其对应物)可能是不可行的,因此版本控制必须在内部某处完成,并且回滚可能要复杂得多。但我希望它能显示出总体思路。

Optimistic concurrency 在数据库引擎中使用,当预计不同的用户很少会访问相同的数据时。它可以是这样的:

First user reads data and timestamp. Users handles the data for some time, user checks if the timestamp in the DB hasn't changes since he read the data, if it doesn't then user updates the data and the timestamp.

但是,在内部 DB-engine 无论如何都使用锁进行更新,在此锁期间,它会检查时间戳是否已更改,如果未更改,则引擎会更新数据。与悲观并发相比,数据被锁定的时间更短。而且你还需要使用某种锁定。