你如何用 Tokio 简单地包装同步网络 I/O?
How do you wrap synchronous network I/O trivially with Tokio?
不幸的是,我对 Rust 并发开发的理解存在明显的缺陷。这个问题源于为解决一个看似“微不足道”的问题而进行的数周反复斗争。
问题域
正在开发一个名为 twistrs 的 Rust 库,它是一个域名排列和枚举库。该库的目的和 objective 是提供一个根域(例如 google.com
)并生成该域的排列(例如 guugle.com
)和丰富该排列(例如它解析为123.123.123.123
).
它的 objective 之一就是比它的 Python counterpart 执行得快得多。最值得注意的是网络调用,例如但不限于 DNS 查找。
当前设计方案
库背后的想法(除了作为一个学习基地)是开发一个非常简单的安全库,可以实现以满足各种要求。您(作为客户)可以选择在内部直接与 permutation or enrichment 模块交互,或使用提供的库 async/concurrent 实现。
请注意,没有 内部共享状态。这可能是非常低效的,但暂时没有意义,因为它可以防止很多问题。
当前问题
在内部,DNS 查找是同步完成的,自然会阻塞。我无法将其转换为并发代码。我能得到的最接近的是使用 tokio mpsc 频道,并执行一个单独的 tokio 任务:
use twistrs::enrich::{Result, DomainMetadata};
use twistrs::permutate::Domain;
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let domain = Domain::new("google.com").unwrap();
let _permutations = domain.all().unwrap().collect::<Vec<String>>();
let (mut tx, mut rx) = mpsc::channel(1000);
tokio::spawn(async move {
for (i, v) in _permutations.into_iter().enumerate() {
let domain_metadata = DomainMetadata::new(v.clone());
let dns_resolution = domain_metadata.dns_resolvable();
if let Err(_) = tx.send((i, dns_resolution)).await {
println!("receiver dropped");
return;
}
}
});
while let Some(i) = rx.recv().await {
println!("got: {:?}", i);
}
}
也就是说,机敏的 reader 会立即注意到这会阻止,并有效地 运行 以任何方式同步进行 DNS 查找。
尝试在 for 循环中生成 Tokio 任务是不可能的,因为 move
是在 tx
上完成的(并且 tx
不是 impl Copy
):
for (i, v) in _permutations.into_iter().enumerate() {
tokio::spawn(async move {
let domain_metadata = DomainMetadata::new(v.clone());
let dns_resolution = domain_metadata.dns_resolvable();
if let Err(_) = tx.send((i, dns_resolution)).await {
println!("receiver dropped");
return;
}
});
}
删除 await
当然不会发生任何事情,因为需要轮询生成的任务。我如何有效地将所有这些同步调用包装到异步任务中,这些任务可以 运行 独立并最终汇聚成一个集合?
我遇到的一个类似的 Rust 项目是 batch_resolve,它在这方面做得很好 (!)。但是,我发现实现对于我想要实现的目标来说异常复杂(也许我错了)。非常感谢任何有助于实现这一目标的帮助或见解。
如果您想快速重现此内容,只需克隆项目并使用此 post 中的第一个代码片段更新 examples/twistrs-cli/main.rs
。
编辑:我误解了你的问题,没有意识到 DNS 解析本身不是异步的。以下方法实际上不适用于同步代码,只会导致执行程序因阻塞代码而停止,但我会保留它以防您切换到异步解决方法。如果满足您的需要,我建议使用 tokio 的异步 lookup_host()
。
异步执行器旨在处理大量并行任务,因此您可以尝试为每个请求生成一个新任务,使用 Semaphore
创建 运行 数量的上限一次完成任务。其代码可能如下所示:
let (mut tx, mut rx) = mpsc::channel(1000);
let semaphore = Arc::new(Semaphore::new(1000)); // allow up to 1000 tasks to run at once
for (i, v) in _permutations.into_iter().enumerate() {
let domain_metadata = DomainMetadata::new(v.clone());
let mut tx = tx.clone(); // every task will have its own copy of the sender
let permit = semaphore.acquire_owned().await; // wait until we have a permit
let dns_resolution = domain_metadata.dns_resolvable();
tokio::spawn(async move {
if let Err(_) = tx.send((i, dns_resolution)).await {
println!("receiver dropped");
return;
}
drop(permit); // explicitly release the permit, to make sure it was moved into this task
}); // note: task spawn results and handle dropped here
}
while let Some(i) = rx.recv().await {
println!("got: {:?}", i);
}
如果额外任务的开销太大,您可以尝试使用 futures
crate 中的 FuturesUnordered
等工具将这些任务组合成一个单一的 future。这允许您获取任意大的 futures 列表并在单个任务中重复轮询它们。
不幸的是,我对 Rust 并发开发的理解存在明显的缺陷。这个问题源于为解决一个看似“微不足道”的问题而进行的数周反复斗争。
问题域
正在开发一个名为 twistrs 的 Rust 库,它是一个域名排列和枚举库。该库的目的和 objective 是提供一个根域(例如 google.com
)并生成该域的排列(例如 guugle.com
)和丰富该排列(例如它解析为123.123.123.123
).
它的 objective 之一就是比它的 Python counterpart 执行得快得多。最值得注意的是网络调用,例如但不限于 DNS 查找。
当前设计方案
库背后的想法(除了作为一个学习基地)是开发一个非常简单的安全库,可以实现以满足各种要求。您(作为客户)可以选择在内部直接与 permutation or enrichment 模块交互,或使用提供的库 async/concurrent 实现。
请注意,没有 内部共享状态。这可能是非常低效的,但暂时没有意义,因为它可以防止很多问题。
当前问题
在内部,DNS 查找是同步完成的,自然会阻塞。我无法将其转换为并发代码。我能得到的最接近的是使用 tokio mpsc 频道,并执行一个单独的 tokio 任务:
use twistrs::enrich::{Result, DomainMetadata};
use twistrs::permutate::Domain;
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let domain = Domain::new("google.com").unwrap();
let _permutations = domain.all().unwrap().collect::<Vec<String>>();
let (mut tx, mut rx) = mpsc::channel(1000);
tokio::spawn(async move {
for (i, v) in _permutations.into_iter().enumerate() {
let domain_metadata = DomainMetadata::new(v.clone());
let dns_resolution = domain_metadata.dns_resolvable();
if let Err(_) = tx.send((i, dns_resolution)).await {
println!("receiver dropped");
return;
}
}
});
while let Some(i) = rx.recv().await {
println!("got: {:?}", i);
}
}
也就是说,机敏的 reader 会立即注意到这会阻止,并有效地 运行 以任何方式同步进行 DNS 查找。
尝试在 for 循环中生成 Tokio 任务是不可能的,因为 move
是在 tx
上完成的(并且 tx
不是 impl Copy
):
for (i, v) in _permutations.into_iter().enumerate() {
tokio::spawn(async move {
let domain_metadata = DomainMetadata::new(v.clone());
let dns_resolution = domain_metadata.dns_resolvable();
if let Err(_) = tx.send((i, dns_resolution)).await {
println!("receiver dropped");
return;
}
});
}
删除 await
当然不会发生任何事情,因为需要轮询生成的任务。我如何有效地将所有这些同步调用包装到异步任务中,这些任务可以 运行 独立并最终汇聚成一个集合?
我遇到的一个类似的 Rust 项目是 batch_resolve,它在这方面做得很好 (!)。但是,我发现实现对于我想要实现的目标来说异常复杂(也许我错了)。非常感谢任何有助于实现这一目标的帮助或见解。
如果您想快速重现此内容,只需克隆项目并使用此 post 中的第一个代码片段更新 examples/twistrs-cli/main.rs
。
编辑:我误解了你的问题,没有意识到 DNS 解析本身不是异步的。以下方法实际上不适用于同步代码,只会导致执行程序因阻塞代码而停止,但我会保留它以防您切换到异步解决方法。如果满足您的需要,我建议使用 tokio 的异步 lookup_host()
。
异步执行器旨在处理大量并行任务,因此您可以尝试为每个请求生成一个新任务,使用 Semaphore
创建 运行 数量的上限一次完成任务。其代码可能如下所示:
let (mut tx, mut rx) = mpsc::channel(1000);
let semaphore = Arc::new(Semaphore::new(1000)); // allow up to 1000 tasks to run at once
for (i, v) in _permutations.into_iter().enumerate() {
let domain_metadata = DomainMetadata::new(v.clone());
let mut tx = tx.clone(); // every task will have its own copy of the sender
let permit = semaphore.acquire_owned().await; // wait until we have a permit
let dns_resolution = domain_metadata.dns_resolvable();
tokio::spawn(async move {
if let Err(_) = tx.send((i, dns_resolution)).await {
println!("receiver dropped");
return;
}
drop(permit); // explicitly release the permit, to make sure it was moved into this task
}); // note: task spawn results and handle dropped here
}
while let Some(i) = rx.recv().await {
println!("got: {:?}", i);
}
如果额外任务的开销太大,您可以尝试使用 futures
crate 中的 FuturesUnordered
等工具将这些任务组合成一个单一的 future。这允许您获取任意大的 futures 列表并在单个任务中重复轮询它们。