如何执行线程安全的 IO 和缓存到生锈的文件?

How to perform thread safe IO and caching to file in rust?

上下文:

我正在编写一个 Web 服务器,我们在其中处理不同的段。我想将这些不同的段缓存在不同的文件中(这些段的大小可达 10MB)。像这样:

pub async fn segment_handler(segment: String) {
   if is_cached(&segment) {
       return get_from_cache(segment)
   }
   // Do some computation to get the result.
   let result = do_some_large_computation(segment);
   // Cache this result to a file.
   let file_name = &format!("./cache/{}", &segment);
   fs::create(file_name);
   fs::write(file_name, result).expect("Unable to write file");
   result
}

既然segment_handler可以被不同segment的多个线程调用,那么fs::write线程安全吗?如果不是,我们不能使用互斥量,因为每次调用的 segment: String 可能不同,使用互斥量会使性能变差。我需要类似互斥锁的东西,但仅在 segment: String 上。这个问题的解决方案是什么?

环境:

您发布的代码无法编译,因为没有 fs::create 这样的东西,但幸运的是您根本不需要它。 fs::write 函数为您创建文件。

至少在 Linux 上,从多个不同线程在同一路径上同时调用 fs::write 将导致包含传递给 fs::write 调用之一的内容的文件。请注意,如果您使用文件的存在性来确定是否需要从缓存中读取或重新计算它,您可能最终会遇到多个线程重新计算相同的值,然后所有线程都将其写入文件。


您应该知道,由于您正在使用 async/await,因此您不能使用 std::fs 模块,因为它会阻塞线程。您应该像这样使用 tokio::fs::write

pub async fn segment_handler(segment: String) {
    if is_cached {
         return get_from_cache(segment)
    }
    // Do some computation to get the result.
    let result = do_some_large_computation(segment);
    // Cache this result to a file.
    let file_name = &format!("./cache/{}", &segment);
    tokio::fs::write(file_name, result).await.expect("Unable to write file");
    result
}

另一个正确的选择是像这样使用 spawn_blocking

pub async fn segment_handler(segment: String) {
    if is_cached {
        return get_from_cache(segment)
    }
    tokio::task::spawn_blocking(move || {
        // Do some computation to get the result.
        let result = do_some_large_computation(segment);
        // Cache this result to a file.
        let file_name = &format!("./cache/{}", &segment);
        tokio::fs::write(file_name, result).await.expect("Unable to write file");
        result
    }).await.unwrap("Panic in spawn_blocking")
}

您可以在 Tokio 的文档 CPU-bound tasks and blocking code 中阅读更多关于为什么必须像这样正确处理阻塞的信息。

Tokio is able to concurrently run many tasks on a few threads by repeatedly swapping the currently running task on each thread. However, this kind of swapping can only happen at .await points, so code that spends a long time without reaching an .await will prevent other tasks from running. To combat this, Tokio provides two kinds of threads: Core threads and blocking threads. The core threads are where all asynchronous code runs, and Tokio will by default spawn one for each CPU core. The blocking threads are spawned on demand, and can be used to run blocking code that would otherwise block other tasks from running.

To spawn a blocking task, you should use the spawn_blocking function.

请注意,我已链接到 Tokio 0.2 的文档,因为 warp 尚不支持 Tokio 0.3。


如果函数在第一次调用完成之前被多次调用,为了防止多次计算该值,您可以使用一种基于 HashMap 存储在互斥锁后面的技术,如下所示:

use std::collections::HashMap;
use std::sync::Mutex;
use tokio::sync::broadcast;

pub struct Cache {
    inner: Mutex<Inner>,
}
struct Inner {
    cached: HashMap<String, CachedType>,
    pending: HashMap<String, broadcast::Sender<CachedType>>,
}

pub enum TryCached {
    Exists(CachedType),
    Pending(broadcast::Receiver<CachedType>),
    New(),
}

impl Cache {
    pub fn try_get(&self, key: &str) -> TryCached {
        let mut inner = self.inner.lock().unwrap();
        if let Some(value) = inner.cached.get(key) {
            // To avoid clone, use HashMap<String, Arc<CachedType>> and clone anyway.
            TryCached::Exists(value.clone())
        } else if let Some(pending) = inner.pending.get(key) {
            TryCached::Pending(pending.subscribe())
        } else {
            let (channel, _) = broadcast::channel(1);
            inner.pending.insert(key.to_string(), channel);
            TryCached::New()
        }
    }
    pub fn put_computed(&self, key: String, value: CachedType) {
        let mut inner = self.inner.lock().unwrap();
        if let Some(chan) = inner.pending.remove(&key) {
            chan.send(value.clone());
        }
        inner.cached.insert(key, value);
    }
}

然后可以将此方法实现为对 try_get 的调用,根据返回的枚举值执行不同的操作。

pub async fn segment_handler(cache: &Cache, segment: String) -> CachedType {
    match cache.try_get(&segment) {
        TryCached::Exists(value) => value,
        TryCached::Pending(mut chan) => chan.recv().await.expect("Sender dropped without sending"),
        TryCached::New() => {
            let (segment, value) = tokio::task::spawn_blocking(move || {
                // Do some computation to get the result.
                let result = do_some_large_computation(&segment);
                // Cache this result to a file.
                let file_name = &format!("./cache/{}", &segment);
                std::fs::write(file_name, result.to_slice()).expect("Unable to write file");
                (segment, result)
            })
            .await
            .expect("Panic in spawn_blocking");

            cache.put_computed(segment, value.clone());
            value
        }
    }
}

完整示例可在 the playground 上找到。

由于互斥锁,此方法完全 thread-safe。请注意,这使用同步互斥体而不是异步互斥体。要详细了解为什么这样做可以,请参阅 Tokio 教程中的 the shared state chapter