同时可变访问保证不相交的大向量的任意索引

Simultaneous mutable access to arbitrary indices of a large vector that are guaranteed to be disjoint

上下文

我有一个案例,多个线程必须更新存储在共享向量中的对象。但是vector很大,要更新的元素个数比较少。

问题

在一个最小的示例中,要更新的元素集可以由包含要更新的元素索引的(散列)集来标识。因此,代码可能如下所示:

let mut big_vector_of_elements = generate_data_vector();

while has_things_to_do() {
    let indices_to_update = compute_indices();
    indices_to_update.par_iter() // Rayon parallel iteration
       .map(|index| big_vector_of_elements[index].mutate())
       .collect()?;
}

这在 Rust 中显然是不允许的:big_vector_of_elements 不能同时在多个线程中可变地借用。但是,将每个元素包装在例如 Mutex 锁中似乎是不必要的:这种特定情况在没有显式同步的情况下是安全的。由于索引来自一个集合,因此可以保证它们是不同的。 par_iter 中没有两次迭代触及向量的相同元素。

重述我的问题

编写并行改变向量中元素的程序的最佳方式是什么,其中同步已通过索引的选择处理,但编译器不理解后者?

一个接近最优的解决方案是将 big_vector_of_elements 中的所有元素包装在某个假设的 UncontendedMutex 锁中,这将是 Mutex 的变体,在无竞争的情况下速度快得离谱情况下,当发生争用(甚至恐慌)时,这可能会花费任意长的时间。理想情况下,对于任何 T.

UncontendedMutex<T> 也应与 T 具有相同的大小和对齐方式

相关但不同的问题:

可以用 "use Rayon's parallel iterator"、"use chunks_mut" 或 "use split_at_mut" 回答多个问题:

这些答案在这里似乎并不相关,因为这些解决方案意味着遍历整个 big_vector_of_elements,然后针对每个元素确定是否需要更改任何内容。本质上,这意味着这样的解决方案如下所示:

let mut big_vector_of_elements = generate_data_vector();

while has_things_to_do() {
    let indices_to_update = compute_indices();
    for (index, mut element) in big_vector_of_elements.par_iter().enumerate() {
        if indices_to_update.contains(index) {
            element.mutate()?;
        }
    }
}

此解决方案花费的时间与 big_vector_of_elements 的大小成正比,而第一个解决方案仅在与 indices_to_update 的大小成正比的元素数量上循环。

您可以排序 indices_to_update 并通过调用 split_*_mut 提取可变引用。

let len = big_vector_of_elements.len();

while has_things_to_do() {
    let mut tail = big_vector_of_elements.as_mut_slice();

    let mut indices_to_update = compute_indices();
    // I assumed compute_indices() returns unsorted vector
    // to highlight the importance of sorted order
    indices_to_update.sort();

    let mut elems = Vec::new();

    for idx in indices_to_update {
        // cut prefix, so big_vector[idx] will be tail[0]
        tail = tail.split_at_mut(idx - (len - tail.len())).1;

        // extract tail[0]
        let (elem, new_tail) = tail.split_first_mut().unwrap();
        elems.push(elem);

        tail = new_tail;
    }
}

仔细检查此代码中的所有内容;我没有测试它。然后你可以调用 elems.par_iter(...) 或其他任何东西。

我认为这是使用 unsafe 代码的合理位置。逻辑本身是安全的,但不能被编译器检查,因为它依赖于类型系统之外的知识(BTreeSet 的契约,它本身依赖于 Ord 的实现和 [=15 的朋友=]).

在此示例中,我们通过 range 预先对所有索引进行边界检查,因此对 add 的每次调用都可以安全使用。由于我们接受了一个集合,我们知道所有索引都是不相交的,所以我们没有引入可变别名。从切片中获取原始指针以避免切片本身和返回值之间的别名很重要。

use std::collections::BTreeSet;

fn uniq_refs<'i, 'd: 'i, T>(
    data: &'d mut [T],
    indices: &'i BTreeSet<usize>,
) -> impl Iterator<Item = &'d mut T> + 'i {
    let start = data.as_mut_ptr();
    let in_bounds_indices = indices.range(0..data.len());

    // I copied this from a Stack Overflow answer
    // without reading the text that explains why this is safe
    in_bounds_indices.map(move |&i| unsafe { &mut *start.add(i) })
}

use std::iter::FromIterator;

fn main() {
    let mut scores = vec![1, 2, 3];

    let selected_scores: Vec<_> = {
        // The set can go out of scope after we have used it.
        let idx = BTreeSet::from_iter(vec![0, 2]);
        uniq_refs(&mut scores, &idx).collect()
    };

    for score in selected_scores {
        *score += 1;
    }

    println!("{:?}", scores);
}

使用此函数找到所有单独的可变引用后,您可以使用 Rayon 并行修改它们:

use rayon::prelude::*; // 1.0.3

fn example(scores: &mut [i32], indices: &BTreeSet<usize>) {
    let selected_scores: Vec<_> = uniq_refs(scores, indices).collect();
    selected_scores.into_par_iter().for_each(|s| *s *= 2);

    // Or

    uniq_refs(scores, indices).par_bridge().for_each(|s| *s *= 2);
}

您可能希望考虑使用位集而不是 BTreeMap 来提高效率,但此答案仅使用标准库。

另请参阅:

  • How do I use Rayon with an existing iterator?

当编译器无法强制对 slice 元素的可变引用不是独占时,Cell 非常好。

您可以使用 Cell::from_mut, and then a &Cell<[T]> into a &[Cell<T>] using Cell::as_slice_of_cells&mut [T] 转换为 &Cell<[T]>。所有这些都是零成本:它只是用来指导类型系统。

A &[Cell<T>] 类似于 &[mut T],如果可以这样写:对一片可变元素的共享引用。您可以使用 Cells 进行的操作仅限于读取或替换 — 您无法获得对包装元素本身的引用,无论是否可变。 Rust 也知道 Cell 不是线程安全的(它没有实现 Sync)。这保证了一切都是安全的,没有动态成本。

fn main() {
    use std::cell::Cell;

    let slice: &mut [i32] = &mut [1, 2, 3];
    let cell_slice: &Cell<[i32]> = Cell::from_mut(slice);
    let slice_cell: &[Cell<i32>] = cell_slice.as_slice_of_cells();
    
    let two = &slice_cell[1];
    let another_two = &slice_cell[1];

    println!("This is 2: {:?}", two);
    println!("This is also 2: {:?}", another_two);
    
    two.set(42);
    println!("This is now 42!: {:?}", another_two);
}

由于我一直在处理类似的问题,这里是我的解决方案,除非绝对必要,否则我不建议使用:

struct EvilPtr<T> {
    ptr: *mut T,
}
impl<T> EvilPtr<T> {
    fn new(inp: &mut T) -> Self {
        EvilPtr { ptr: inp as *mut T }
    }
    unsafe fn deref(&self) -> *mut T {
        return self.ptr;
    }
}

unsafe impl<T> Sync for EvilPtr<T> {}
unsafe impl<T> Send for EvilPtr<T> {}

现在您可以:

let indices: [usize; 10] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let mut arr: [i32; 10] = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0];
let e = EvilPtr::new(&mut arr[0]);
unsafe {
    indices.par_iter().for_each(|x: &usize| {
        *e.deref().add(*x) += *x as i32;
    });
}
println!("{:?}", arr);

如果你绝对需要这样做,我建议你把它埋在一些用户友好的界面下,你可以确保不会发生错误。

我有一个相关的问题。我需要并行分配给二维数组的任意列。我使用 ndarray myarray.axis_chunks_iter_mut(nd::Axis(1), 1) 遍历每一列。