如何将 ParallelIterator 转换回顺序迭代器?
How to convert ParallelIterator back to sequential Iterator?
我正在迭代数据库中数 GB 的输入项。在每个输入项上,我都在做一些 CPU 密集型处理,生成一个或多个新的输出项,总共数十 GB。然后将输出项存储在另一个数据库中 table.
通过使用 Rayon 进行并行处理,我获得了不错的加速。但是,数据库 API 不是线程安全的;它是 Send
但不是 Sync
,因此必须序列化 I/O。
理想情况下,我只想写:
input_database
.read_items()
.par_bridge() // Start parallelism.
.flat_map_iter(|input_item| {
// produce an Iterator<Item = OutputItem>
})
.ser_bridge() // End parallelism. This function does not exist.
.for_each(|output_item| {
output_database.write_item(output_item);
});
基本上我想要par_bridge()
的对立面;在调用它的线程上运行的东西,从每个线程读取项目,并连续生成它们。但是在目前的 Rayon 实现中,这似乎并不存在。我不确定这是否是因为它在理论上是不可能的,或者它是否不适合当前库的设计。
输出太大,无法先将其全部收集到 Vec
;它需要直接流式传输到数据库中。
顺便说一下,我没有和 Rayon 结婚;如果有另一个箱子更合适table,我很乐意做出改变。
我认为顺序无关紧要,因此您不需要对输出数据进行排序。
您可以使用 mpsc::channel
将数据从 for_each
闭包传输到您的数据库 api,例如
use std::sync::mpsc;
let (tx, rx) = mpsc::channel();
input_database
.read_items()
.par_bridge() // Start parallelism.
.flat_map_iter(|input_item| {
// produce an Iterator<Item = OutputItem>
})
.for_each(move |output_item| {
tx.send(output_item).unwrap();
});
在第二个线程中,您可以使用 rx
变量接收数据并将其写入数据库。
您可以将输出数据库包装在 Arc<Mutex>
中以防止并行访问:
let output_database = Arc::new (Mutex::new (output_database));
input_database
.read_items()
.par_bridge() // Start parallelism.
.flat_map_iter(|input_item| {
// produce an Iterator<Item = OutputItem>
})
.for_each_with (output_database, |output_database, output_item| {
output_database.lock().write_item(output_item);
});
我正在迭代数据库中数 GB 的输入项。在每个输入项上,我都在做一些 CPU 密集型处理,生成一个或多个新的输出项,总共数十 GB。然后将输出项存储在另一个数据库中 table.
通过使用 Rayon 进行并行处理,我获得了不错的加速。但是,数据库 API 不是线程安全的;它是 Send
但不是 Sync
,因此必须序列化 I/O。
理想情况下,我只想写:
input_database
.read_items()
.par_bridge() // Start parallelism.
.flat_map_iter(|input_item| {
// produce an Iterator<Item = OutputItem>
})
.ser_bridge() // End parallelism. This function does not exist.
.for_each(|output_item| {
output_database.write_item(output_item);
});
基本上我想要par_bridge()
的对立面;在调用它的线程上运行的东西,从每个线程读取项目,并连续生成它们。但是在目前的 Rayon 实现中,这似乎并不存在。我不确定这是否是因为它在理论上是不可能的,或者它是否不适合当前库的设计。
输出太大,无法先将其全部收集到 Vec
;它需要直接流式传输到数据库中。
顺便说一下,我没有和 Rayon 结婚;如果有另一个箱子更合适table,我很乐意做出改变。
我认为顺序无关紧要,因此您不需要对输出数据进行排序。
您可以使用 mpsc::channel
将数据从 for_each
闭包传输到您的数据库 api,例如
use std::sync::mpsc;
let (tx, rx) = mpsc::channel();
input_database
.read_items()
.par_bridge() // Start parallelism.
.flat_map_iter(|input_item| {
// produce an Iterator<Item = OutputItem>
})
.for_each(move |output_item| {
tx.send(output_item).unwrap();
});
在第二个线程中,您可以使用 rx
变量接收数据并将其写入数据库。
您可以将输出数据库包装在 Arc<Mutex>
中以防止并行访问:
let output_database = Arc::new (Mutex::new (output_database));
input_database
.read_items()
.par_bridge() // Start parallelism.
.flat_map_iter(|input_item| {
// produce an Iterator<Item = OutputItem>
})
.for_each_with (output_database, |output_database, output_item| {
output_database.lock().write_item(output_item);
});