我如何 运行 分区数组上的并行计算线程?

How do I run parallel threads of computation on a partitioned array?

我正在尝试跨线程分布一个数组,并让线程并行地对数组的各个部分求和。我希望线程 0 对元素 0 1 2 求和,线程 1 对元素 3 4 5 求和。线程 2 对 6 和 7 求和。线程 3 对 8 和 9 求和。

我是 Rust 的新手,但之前用 C/C++/Java 编码过。我真的把所有东西都扔给了这个程序,我希望我能得到一些指导。

对不起,我的代码很草率,但当它是成品时,我会清理它。请忽略所有命名不当的 variables/inconsistent spacing/etc.

use std::io;
use std::rand;
use std::sync::mpsc::{Sender, Receiver};
use std::sync::mpsc;
use std::thread::Thread;

static NTHREADS: usize = 4;
static NPROCS: usize = 10;

fn main() {
    let mut a = [0; 10]; // a: [i32; 10]
    let mut endpoint = a.len() / NTHREADS;
    let mut remElements = a.len() % NTHREADS;

    for x in 0..a.len() {
        let secret_number = (rand::random::<i32>() % 100) + 1;
        a[x] = secret_number;
        println!("{}", a[x]);
    }
    let mut b = a;
    let mut x = 0;

    check_sum(&mut a);
    // serial_sum(&mut b);

    // Channels have two endpoints: the `Sender<T>` and the `Receiver<T>`,
    // where `T` is the type of the message to be transferred
    // (type annotation is superfluous)
    let (tx, rx): (Sender<i32>, Receiver<i32>) = mpsc::channel();
    let mut scale: usize = 0;

    for id in 0..NTHREADS {
        // The sender endpoint can be copied
        let thread_tx = tx.clone();
        // Each thread will send its id via the channel

        Thread::spawn(move || {
            // The thread takes ownership over `thread_tx`
            // Each thread queues a message in the channel
            let numTougherThreads: usize = NPROCS % NTHREADS;
            let numTasksPerThread: usize = NPROCS / NTHREADS;
            let mut lsum = 0;

            if id < numTougherThreads {
                let mut q = numTasksPerThread+1;
                lsum = 0;

                while q > 0 {
                    lsum = lsum + a[scale];
                    scale+=1;
                    q = q-1;
                }
                println!("Less than numToughThreads lsum: {}", lsum);
            }
            if id >= numTougherThreads {
                let mut z = numTasksPerThread;
                lsum = 0;

                while z > 0 {
                    lsum = lsum + a[scale];
                    scale +=1;
                    z = z-1;
                }    
                println!("Greater than numToughthreads lsum: {}", lsum);
            }
            // Sending is a non-blocking operation, the thread will continue
            // immediately after sending its message
            println!("thread {} finished", id);
            thread_tx.send(lsum).unwrap();
        });
    }

    // Here, all the messages are collected
    let mut globalSum = 0;
    let mut ids = Vec::with_capacity(NTHREADS);
    for _ in 0..NTHREADS {
        // The `recv` method picks a message from the channel
        // `recv` will block the current thread if there no messages      available
        ids.push(rx.recv());
    }
    println!("Global Sum: {}", globalSum);
    // Show the order in which the messages were sent

    println!("ids: {:?}", ids);
}

fn check_sum (arr: &mut [i32]) {
    let mut sum = 0;
    let mut i = 0;
    let mut size = arr.len();
    loop {
        sum += arr[i];
        i+=1;
        if i == size { break; }
    }
    println!("CheckSum is {}", sum);
}

到目前为止,我已经做到了这么多。无法弄清楚为什么线程 0 和 1 具有相同的总和以及 2 和 3 做同样的事情:

 -5
 -49
 -32
 99
 45
 -65
 -64
 -29
 -56
 65
 CheckSum is -91
 Greater than numTough lsum: -54
 thread 2 finished
 Less than numTough lsum: -86
 thread 1 finished
 Less than numTough lsum: -86
 thread 0 finished
 Greater than numTough lsum: -54
 thread 3 finished
 Global Sum: 0
 ids: [Ok(-86), Ok(-86), Ok(-54), Ok(-54)]

我设法通过使用以下代码重写它以处理偶数。

    while q > 0 {
        if id*s+scale == a.len() { break; }
        lsum = lsum + a[id*s+scale];
        scale +=1;
        q = q-1;
    }
    println!("Less than numToughThreads lsum: {}", lsum);
}
if id >= numTougherThreads {
    let mut z = numTasksPerThread;
    lsum = 0;
    let mut scale = 0;

    while z > 0 {
        if id*numTasksPerThread+scale == a.len() { break; }
        lsum = lsum + a[id*numTasksPerThread+scale];
        scale = scale + 1;
        z = z-1;
    }

您的所有任务都会获得 scale 变量的副本。线程 1 和 2 都做同样的事情,因为每个线程都有 scale0 的值,并以与另一个线程相同的方式修改它。 线程 3 和 4 也是如此。

Rust 可以防止破坏线程安全。如果 scale 由线程共享,则在访问变量时会出现竞争条件。

请阅读 closures, they explain the variable copying part, and about threading,其中解释了何时以及如何在线程之间共享变量。

欢迎来到 Rust! :)

Yeah at first I didn't realize each thread gets it's own copy of scale

不仅如此!它还拥有自己的 a!

副本

您尝试执行的操作可能类似于以下代码。我猜你更容易看到一个完整的工作示例,因为你似乎是一个 Rust 初学者并寻求指导。我故意用 Vec 替换了 [i32; 10],因为 Vecnot 隐含地 Copyable。它需要一个明确的clone();我们不能无意中复制它。请注意所有较大和较小的差异。代码也变得更加实用(更少 mut)。我评论了大部分值得注意的事情:

extern crate rand;

use std::sync::Arc;
use std::sync::mpsc;
use std::thread;

const NTHREADS: usize = 4; // I replaced `static` by `const`

// gets used for *all* the summing :)
fn sum<I: Iterator<Item=i32>>(iter: I) -> i32 {
    let mut s = 0;
    for x in iter {
        s += x;
    }
    s
}

fn main() {
    // We don't want to clone the whole vector into every closure.
    // So we wrap it in an `Arc`. This allows sharing it.
    // I also got rid of `mut` here by moving the computations into
    // the initialization.
    let a: Arc<Vec<_>> =
        Arc::new(
            (0..10)
                .map(|_| {
                    (rand::random::<i32>() % 100) + 1
                })
                .collect()
        );

    let (tx, rx) = mpsc::channel(); // types will be inferred

    { // local scope, we don't need the following variables outside
        let num_tasks_per_thread = a.len() / NTHREADS; // same here
        let num_tougher_threads = a.len() % NTHREADS; // same here
        let mut offset = 0;
        for id in 0..NTHREADS {
            let chunksize =
                if id < num_tougher_threads {
                    num_tasks_per_thread + 1
                } else {
                    num_tasks_per_thread
                };
            let my_a = a.clone();  // refers to the *same* `Vec`
            let my_tx = tx.clone();
            thread::spawn(move || {
                let end = offset + chunksize;
                let partial_sum =
                    sum( (&my_a[offset..end]).iter().cloned() );
                my_tx.send(partial_sum).unwrap();
            });
            offset += chunksize;
        }
    }

    // We can close this Sender
    drop(tx);

    // Iterator magic! Yay! global_sum does not need to be mutable
    let global_sum = sum(rx.iter());
    println!("global sum via threads    : {}", global_sum);
    println!("global sum single-threaded: {}", sum(a.iter().cloned()));
}

使用像 crossbeam 这样的 crate,你可以编写以下代码:

use crossbeam; // 0.7.3
use rand::distributions::{Distribution, Uniform}; // 0.7.3

const NTHREADS: usize = 4;

fn random_vec(length: usize) -> Vec<i32> {
    let step = Uniform::new_inclusive(1, 100);
    let mut rng = rand::thread_rng();
    step.sample_iter(&mut rng).take(length).collect()
}

fn main() {
    let numbers = random_vec(10);
    let num_tasks_per_thread = numbers.len() / NTHREADS;

    crossbeam::scope(|scope| {
        // The `collect` is important to eagerly start the threads!
        let threads: Vec<_> = numbers
            .chunks(num_tasks_per_thread)
            .map(|chunk| scope.spawn(move |_| chunk.iter().cloned().sum::<i32>()))
            .collect();

        let thread_sum: i32 = threads.into_iter().map(|t| t.join().unwrap()).sum();
        let no_thread_sum: i32 = numbers.iter().cloned().sum();

        println!("global sum via threads    : {}", thread_sum);
        println!("global sum single-threaded: {}", no_thread_sum);
    })
    .unwrap();
}

Scoped threads 允许您传入保证比线程长的引用。然后您可以直接使用线程的 return 值,跳过频道(很棒,只是这里不需要!)。

我跟着How can I generate a random number within a range in Rust?生成了随机数。我也将其更改为范围 [1,100],因为我 认为 这就是你的意思。但是,您的原始代码实际上是[-98,100],您也可以这样做。

Iterator::sum用于求和数字的迭代器。

我输入了线程工作的一些粗略性能数字,忽略了向量构造,处理了 100,000,000 个数字,使用 Rust 1.34 并在发布模式下编译:

| threads | time (ns) | relative time (%) |
|---------+-----------+-------------------|
|       1 |  33824667 |            100.00 |
|       2 |  16246549 |             48.03 |
|       3 |  16709280 |             49.40 |
|       4 |  14263326 |             42.17 |
|       5 |  14977901 |             44.28 |
|       6 |  12974001 |             38.36 |
|       7 |  13321743 |             39.38 |
|       8 |  13370793 |             39.53 |

另请参阅:

  • How can I pass a reference to a stack variable to a thread?