如何将 ParallelIterator 转换回顺序迭代器?

How to convert ParallelIterator back to sequential Iterator?

我正在迭代数据库中数 GB 的输入项。在每个输入项上,我都在做一些 CPU 密集型处理,生成一个或多个新的输出项,总共数十 GB。然后将输出项存储在另一个数据库中 table.

通过使用 Rayon 进行并行处理,我获得了不错的加速。但是,数据库 API 不是线程安全的;它是 Send 但不是 Sync,因此必须序列化 I/O。

理想情况下,我只想写:

input_database
    .read_items()
    .par_bridge() // Start parallelism.
    .flat_map_iter(|input_item| {
        // produce an Iterator<Item = OutputItem>
    })
    .ser_bridge() // End parallelism. This function does not exist.
    .for_each(|output_item| {
        output_database.write_item(output_item);
    });

基本上我想要par_bridge()的对立面;在调用它的线程上运行的东西,从每个线程读取项目,并连续生成它们。但是在目前的 Rayon 实现中,这似乎并不存在。我不确定这是否是因为它在理论上是不可能的,或者它是否不适合当前库的设计。

输出太大,无法先将其全部收集到 Vec;它需要直接流式传输到数据库中。

顺便说一下,我没有和 Rayon 结婚;如果有另一个箱子更合适table,我很乐意做出改变。

我认为顺序无关紧要,因此您不需要对输出数据进行排序。

您可以使用 mpsc::channel 将数据从 for_each 闭包传输到您的数据库 api,例如

use std::sync::mpsc;

let (tx, rx) = mpsc::channel();

input_database
    .read_items()
    .par_bridge() // Start parallelism.
    .flat_map_iter(|input_item| {
        // produce an Iterator<Item = OutputItem>
    })
    .for_each(move |output_item| {
        tx.send(output_item).unwrap();
    });

在第二个线程中,您可以使用 rx 变量接收数据并将其写入数据库。

您可以将输出数据库包装在 Arc<Mutex> 中以防止并行访问:

let output_database = Arc::new (Mutex::new (output_database));
input_database
    .read_items()
    .par_bridge() // Start parallelism.
    .flat_map_iter(|input_item| {
        // produce an Iterator<Item = OutputItem>
    })
    .for_each_with (output_database, |output_database, output_item| {
        output_database.lock().write_item(output_item);
    });