你如何用 Tokio 简单地包装同步网络 I/O?

How do you wrap synchronous network I/O trivially with Tokio?

不幸的是,我对 Rust 并发开发的理解存在明显的缺陷。这个问题源于为解决一个看似“微不足道”的问题而进行的数周反复斗争。


问题域

正在开发一个名为 twistrs 的 Rust 库,它是一个域名排列和枚举库。该库的目的和 objective 是提供一个根域(例如 google.com)并生成该域的排列(例如 guugle.com)和丰富该排列(例如它解析为123.123.123.123).

它的 objective 之一就是比它的 Python counterpart 执行得快得多。最值得注意的是网络调用,例如但不限于 DNS 查找。

当前设计方案

库背后的想法(除了作为一个学习基地)是开发一个非常简单的安全库,可以实现以满足各种要求。您(作为客户)可以选择在内部直接与 permutation or enrichment 模块交互,或使用提供的库 async/concurrent 实现。

请注意,没有 内部共享状态。这可能是非常低效的,但暂时没有意义,因为它可以防止很多问题。

当前问题

在内部,DNS 查找是同步完成的,自然会阻塞。我无法将其转换为并发代码。我能得到的最接近的是使用 tokio mpsc 频道,并执行一个单独的 tokio 任务:

use twistrs::enrich::{Result, DomainMetadata};
use twistrs::permutate::Domain;

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let domain = Domain::new("google.com").unwrap();

    let _permutations = domain.all().unwrap().collect::<Vec<String>>();

    let (mut tx, mut rx) = mpsc::channel(1000);

    tokio::spawn(async move {
        for (i, v) in _permutations.into_iter().enumerate() {
            let domain_metadata = DomainMetadata::new(v.clone());

            let dns_resolution = domain_metadata.dns_resolvable();

            if let Err(_) = tx.send((i, dns_resolution)).await {
                println!("receiver dropped");
                return;
            }
        }
    });

    while let Some(i) = rx.recv().await {
        println!("got: {:?}", i);
    }
}

也就是说,机敏的 reader 会立即注意到这会阻止,并有效地 运行 以任何方式同步进行 DNS 查找。

尝试在 for 循环中生成 Tokio 任务是不可能的,因为 move 是在 tx 上完成的(并且 tx 不是 impl Copy ):

for (i, v) in _permutations.into_iter().enumerate() {
    tokio::spawn(async move {
        let domain_metadata = DomainMetadata::new(v.clone());

        let dns_resolution = domain_metadata.dns_resolvable();

        if let Err(_) = tx.send((i, dns_resolution)).await {
            println!("receiver dropped");
            return;
        }
    });
}

删除 await 当然不会发生任何事情,因为需要轮询生成的任务。我如何有效地将所有这些同步调用包装到异步任务中,这些任务可以 运行 独立并最终汇聚成一个集合?

我遇到的一个类似的 Rust 项目是 batch_resolve,它在这方面做得很好 (!)。但是,我发现实现对于我想要实现的目标来说异常复杂(也许我错了)。非常感谢任何有助于实现这一目标的帮助或见解。

如果您想快速重现此内容,只需克隆项目并使用此 post 中的第一个代码片段更新 examples/twistrs-cli/main.rs

编辑:我误解了你的问题,没有意识到 DNS 解析本身不是异步的。以下方法实际上不适用于同步代码,只会导致执行程序因阻塞代码而停止,但我会保留它以防您切换到异步解决方法。如果满足您的需要,我建议使用 tokio 的异步 lookup_host()


异步执行器旨在处理大量并行任务,因此您可以尝试为每个请求生成一个新任务,使用 Semaphore 创建 运行 数量的上限一次完成任务。其代码可能如下所示:

let (mut tx, mut rx) = mpsc::channel(1000);
let semaphore = Arc::new(Semaphore::new(1000)); // allow up to 1000 tasks to run at once

for (i, v) in _permutations.into_iter().enumerate() {
    let domain_metadata = DomainMetadata::new(v.clone());
    let mut tx = tx.clone(); // every task will have its own copy of the sender
    let permit = semaphore.acquire_owned().await; // wait until we have a permit

    let dns_resolution = domain_metadata.dns_resolvable();
    tokio::spawn(async move {
        if let Err(_) = tx.send((i, dns_resolution)).await {
            println!("receiver dropped");
            return;
        }
        drop(permit); // explicitly release the permit, to make sure it was moved into this task
    }); // note: task spawn results and handle dropped here
}

while let Some(i) = rx.recv().await {
    println!("got: {:?}", i);
}

如果额外任务的开销太大,您可以尝试使用 futures crate 中的 FuturesUnordered 等工具将这些任务组合成一个单一的 future。这允许您获取任意大的 futures 列表并在单个任务中重复轮询它们。