crossbeam::epoch 内存回收

Memory reclamation with crossbeam::epoch

我在 crossbeam 中遇到内存回收问题。假设您正在实现一个简单的线程安全无锁容器,它包含一个值。任何线程都可以获得存储值的克隆,并且该值可以随时更新,之后读者开始观察新值的克隆。

虽然典型的用例是将 Arc<X> 之类的东西指定为 T,但实现不能依赖 T 是指针大小的 - 例如,X 可能是一个特征,导致一个胖指针 Arc<X>。但是对任意 T 的无锁访问似乎非常适合 epoch-based lock-free code。基于这些例子,我想出了这个:

extern crate crossbeam;

use std::thread;
use std::sync::atomic::Ordering;

use crossbeam::epoch::{self, Atomic, Owned};

struct Container<T: Clone> {
    current: Atomic<T>,
}

impl<T: Clone> Container<T> {
    fn new(initial: T) -> Container<T> {
        Container { current: Atomic::new(initial) }
    }

    fn set_current(&self, new: T) {
        let guard = epoch::pin();
        let prev = self.current.swap(Some(Owned::new(new)),
                                     Ordering::AcqRel, &guard);
        if let Some(prev) = prev {
            unsafe {
                // once swap has propagated, *PREV will no longer
                // be observable
                //drop(::std::ptr::read(*prev));
                guard.unlinked(prev);
            }
        }
    }

    fn get_current(&self) -> T {
        let guard = epoch::pin();
        // clone the latest visible value
        (*self.current.load(Ordering::Acquire, &guard).unwrap()).clone()
    }
}

当与不分配的类型一起使用时,例如使用 T=u64,效果很好 - set_currentget_current 可以被调用数百万次而不会泄漏。 (进程监视器显示由于 epoch pseudo-gc 而导致的轻微内存振荡,正如预期的那样,但没有长期增长。)但是,当 T 是分配的类型时,例如Box<u64>,可以很容易地观察到泄漏。例如:

fn main() {
    use std::sync::Arc;
    let c = Arc::new(Container::new(Box::new(0)));
    const ITERS: u64 = 100_000_000;
    let producer = thread::spawn({
        let c = Arc::clone(&c);
        move || {
            for i in 0..ITERS {
                c.set_current(Box::new(i));
            }
        }
    });
    let consumers: Vec<_> = (0..16).map(|_| {
        let c = Arc::clone(&c);
        thread::spawn(move || {
            let mut last = 0;
            loop {
                let current = c.get_current();
                if *current == ITERS - 1 {
                    break;
                }
                assert!(*current >= last);
                last = *current;
            }
        })}).collect();
    producer.join().unwrap();
    for x in consumers {
        x.join().unwrap();
    }
}

运行 该程序显示内存使用量稳步显着增加,最终消耗的内存量与迭代次数成正比。

根据the blog post introducing it,Crossbeam的纪元开垦"does not run destructors, but merely deallocates memory"。 Treiber 堆栈示例中的 try_pop 使用 ptr::read(&(*head).data)head.data 中包含的值移出要释放的 head 对象。数据对象的所有权被转移给调用者,调用者要么将它移动到别处,要么在它超出范围时释放它。

这将如何转化为上面的代码? setter 是否是 guard.unlinked 的正确位置,或者如何确保 drop 在基础对象上是 运行?取消注释显式 drop(ptr::read(*prev)) 会导致检查单调性的断言失败,可能表明过早释放。

问题的症结在于(正如您自己已经发现的那样)guard.unlinked(prev) 推迟了以下代码的执行:

drop(Vec::from_raw_parts(prev.as_raw(), 0, 1));

但是您希望它推迟:

drop(Vec::from_raw_parts(prev.as_raw(), 1, 1));

或者,等价地:

drop(Box::from_raw(prev.as_raw());

换句话说,unlinked只是释放存储对象的内存,而不是删除对象本身。

这是Crossbeam目前已知的一个痛点,幸好很快就会解决。 Crossbeam 的基于纪元的垃圾收集器目前正在进行重新设计和重写,以便:

  • 允许延迟删除和任意延迟函数
  • 逐步收集垃圾以尽量减少暂停
  • 避免线程本地垃圾袋过度拥挤
  • 更热心收集大件垃圾
  • 修复 API
  • 中的健全性问题

如果您想了解有关新 Crossbeam 设计的更多信息,请查看 RFCs repository. I suggest starting with the RFC on new Atomic and the RFC on new GC

我创建了一个实验性的箱子,Coco,它与 Crossbeam 的新设计有很多共同点。如果您现在需要解决方案,我建议您改用它。但请记住,一旦我们发布新版本(可能是本月或下个月),Coco 将被弃用,取而代之的是 Crossbeam。

作为 的一些细节,当前 Crossbeam 的一个已知限制是它仅支持重新分配,而不支持完全删除已变得无法访问但其他线程可能仍然可见的对象。这不会影响 Crossbeam 支持的无锁集合,它会自动删除集合用户的项目 "observed" - 不允许偷看。这符合队列或堆栈的需要,但不符合例如队列的需要。无锁地图。

coco crate 解决了这个问题,它定义了几个并发集合并作为下一代 Crossbeam 设计的预览。它支持延迟删除值。这是使用 coco 的 Container 的再现:

use std::thread;
use std::sync::atomic::Ordering;

use coco::epoch::{self, Atomic, Owned};

struct Container<T: Clone> {
    current: Atomic<T>,
}

impl<T: Clone> Container<T> {
    fn new(initial: T) -> Container<T> {
        Container { current: Atomic::new(initial) }
    }

    fn set_current(&self, new: T) {
        epoch::pin(|scope| {
            let prev = self.current.swap(Owned::new(new).into_ptr(&scope),
                                         Ordering::AcqRel, &scope);
            unsafe {
                scope.defer_drop(prev);
            }
        })
    }

    fn get_current(&self) -> T {
        epoch::pin(|scope| {
            let obj_ref = unsafe {
                self.current.load(Ordering::Acquire, &scope).as_ref().unwrap()
            };
            obj_ref.clone()
        })
    }
}

当运行与问题中的main()相同时,不会泄漏内存。

需要考虑的一件事是,根据文档,epoch::pin() 附带 SeqCst 栅栏和一些原子操作的成本。 (请注意,epoch::pin() 在 Crossbeam 下也不是免费的,而且实际上要贵得多。)现代硬件上 10-15 ns 的延迟可能与大多数用途无关,但用户在使用时应该注意这一点编写代码试图从无锁操作中挤出每一纳秒。