多个 RwLock::write 在 Rust 中等待的提前停止

Early stop of multiple RwLock::write waiting in Rust

我的 Rust 代码使用 RwLock 在多线程中处理数据。每个线程在使用 read 锁时填充一个公共存储(例如填充数据库,但我的情况有点不同)。最终,公共存储空间将被填满。我需要暂停所有处理,重新分配存储 space(例如,从云端分配更多磁盘 space),然后继续。

// psudo-code
fn thread_worker(tasks) {
  let lock = rwlock.read().unwrap();
  for task in tasks {
    // please ignore out_of_space check race condition
    // it's here just to explain the question 
    if out_of_space {
      drop(lock);
      let write_lock = rwlock.write().unwrap();
      // get more storage
      drop(write_lock);
      lock = rwlock.read().unwrap();
    }
    // handle task WITHOUT getting a read lock on every pass
    // getting a lock is far costlier than actual task processing
  }
  drop(lock);
}

由于所有的线程都会在大约同一时间快速命中space,它们都可以释放read锁,并获得write。第一个获得 write 锁的线程将解决存储问题。但是现在我有一个可能的临时死锁情况 - 所有其他线程也在等待 write 锁,即使它们不再需要它。

所以有可能出现这种情况:假设有 3 个线程都在等待 write,第一个线程获得 write,修复问题,释放 write,然后等待read。第二个输入 write 但很快就跳过了,因为问题已经解决并发布了。第一个和第二个线程将进入 read 并继续处理,但第三个线程仍在等待 write 并且会等待很长时间,直到前两个 运行 退出space 或完成他们所有的工作。

给定所有等待 write 的线程,我如何“中止”所有其他线程在第一个线程完成其工作后但在释放 write 锁之前等待它已经获得?

我看到有一个 poisoning 功能,但它是为恐慌而设计的,将其重新用于生产似乎是错误的,而且很难正确完成。此外,Rust 开发人员正在 thinking 删除它。

P.S。每个循环迭代本质上是一个 data[index] = value 赋值,其中 data 是一个由许多线程共享的巨大内存映射。 index 在所有线程中缓慢增长,因此最终所有线程 运行 超出内存映射大小。当发生这种情况时,memmap 被销毁,文件重新分配,并创建一个新的 memmap。因此,不可能在每次循环迭代时都获得读锁。

正在查看 your code, 您可以使用额外的互斥锁:

// pseudo-code
fn thread_worker(tasks) {
  for task in tasks {
    if out_of_space {
      drop(lock);
      {
        let mutex = mutex.lock();      
        if out_of_space { // potentially updated by another worker
          let write_lock = rwlock.write();
          // get more storage
          ...
          // drop(write_lock); is automatic here
        }
        // drop(mutex); is automatic here
      }
      lock = rwlock.read();
    }

    // copy memory for the task
    ...
  }
}

此处使用的模式称为 Double-checked locking

这解决了您遇到的问题,即在重新分配后,下一方不会永远等待 rwlock.write,因为它不会通过互斥临界区内的 out_of_space 检查。

然而这个解决方案仍然有一个问题,第一个失败的工人将等待所有其他工人遇到out_of_space条件才能继续重新分配,因为它需要等待 all read() 锁被删除。

我建议重构此代码以将重新分配逻辑移出此方法。

如果可能,也尽量避免显式删除,以支持或RAII这通常是一个好习惯。

首先请注意,根据您的目标平台,您的代码可能已经按原样运行。例如,对于 Rust 线​​程依赖 libpthread 的平台(例如 Linux),以及写锁优先于读锁的任何平台。

如果你想要一个跨平台的解决方案,你需要做的就是切换到parking-lot which provides a fair implementation of a RwLock. In particular this means that readers trying to acquire the lock will block even if the lock is unlocked when there are writers waiting to acquire the lock

这是公平的事件顺序RwLock

  • 最初所有线程都运行宁并持有读锁。
  • space 中第一个到 运行 的线程释放读锁并请求写锁。由于其他线程仍然持有读锁,第一个线程被阻塞。
  • 一个接一个,其他线程运行出space,释放读锁,请求写锁。
  • 一旦所有线程都释放了读锁,其中一个线程获得了写锁。
  • 获得写锁的线程分配更多内存,释放写锁,请求读锁。 由于等待写锁的其他线程优先,读请求阻塞。
  • 一个接一个,其他线程获取写锁,注意到有可用内存,释放写锁,请求读锁。
  • 一旦所有线程都获取并释放了写锁,它们都会获取读锁并继续。

请注意,存在一种理论上的竞争条件,如果其他线程能够在释放读锁和请求写锁所需的时间内继续执行,则一旦分配了内存,其中一个线程就会被阻塞,例如:

drop(lock);
// Another thread gets the write lock, allocates memory and releases the lock
// All the other threads acquire and release the write lock
// At least one other thread acquires the read lock
let write_lock = rwlock.write().unwrap();

考虑到单独分配内存所花费的时间,这种情况在现实生活中发生的可能性小得可以忽略不计。