gpt4 book ai didi

concurrency - 如何在 rust 中执行线程安全 IO 和缓存到文件?

转载 作者:行者123 更新时间:2023-12-03 11:44:26 25 4
gpt4 key购买 nike

语境:
我正在编写一个网络服务器,我们在其中处理不同的段。我想将这些不同的段缓存在不同的文件中(这些段的大小可达 10MB)。像这样的东西:

pub async fn segment_handler(segment: String) {
if is_cached(&segment) {
return get_from_cache(segment)
}
// Do some computation to get the result.
let result = do_some_large_computation(segment);
// Cache this result to a file.
let file_name = &format!("./cache/{}", &segment);
fs::create(file_name);
fs::write(file_name, result).expect("Unable to write file");
result
}
现在因为 segment_handler可以被不同的多个线程调用 segment , 是 fs::write线程安全吗?如果不是,我们不能使用互斥锁,因为 segment: String每次调用都可能不同,使用互斥锁会使性能变差。我需要类似互斥锁的东西,但在 segment: String 上只要。这个问题的解决方案是什么?
环境:
  • rust :1.47
  • 网络服务器:warp
  • 代码用于:使用 ffmpeg
  • 的 HLS 流
  • repo :https://github.com/blmhemu/rumble (尚未实现缓存)
  • 最佳答案

    您发布的代码无法编译,因为没有 fs::create 这样的东西,但幸运的是你根本不需要它。 fs::write 函数为您创建文件。
    至少在 Linux 上,调用 fs::write从多个不同线程同时在同一路径上将导致文件包含传递给 fs::write 之一的内容。来电。请注意,如果您使用文件的存在来确定是否需要从缓存中读取或重新计算它,您最终可能会导致多个线程重新计算相同的值,然后所有线程都将其写入文件。

    您应该知道,由于您使用的是 async/await,因此您不能使用 std::fs模块,因为它阻塞了线程。您应该使用 tokio::fs::write像这样:

    pub async fn segment_handler(segment: String) {
    if is_cached {
    return get_from_cache(segment)
    }
    // Do some computation to get the result.
    let result = do_some_large_computation(segment);
    // Cache this result to a file.
    let file_name = &format!("./cache/{}", &segment);
    tokio::fs::write(file_name, result).await.expect("Unable to write file");
    result
    }
    另一个正确的选择是使用 spawn_blocking 像这样:
    pub async fn segment_handler(segment: String) {
    if is_cached {
    return get_from_cache(segment)
    }
    tokio::task::spawn_blocking(move || {
    // Do some computation to get the result.
    let result = do_some_large_computation(segment);
    // Cache this result to a file.
    let file_name = &format!("./cache/{}", &segment);
    tokio::fs::write(file_name, result).await.expect("Unable to write file");
    result
    }).await.unwrap("Panic in spawn_blocking")
    }
    您可以在 CPU-bound tasks and blocking code 中阅读更多关于为什么必须正确处理这样的阻塞的信息。来自 Tokio 的文档。

    Tokio is able to concurrently run many tasks on a few threads by repeatedly swapping the currently running task on each thread. However, this kind of swapping can only happen at .await points, so code that spends a long time without reaching an .await will prevent other tasks from running. To combat this, Tokio provides two kinds of threads: Core threads and blocking threads. The core threads are where all asynchronous code runs, and Tokio will by default spawn one for each CPU core. The blocking threads are spawned on demand, and can be used to run blocking code that would otherwise block other tasks from running.

    To spawn a blocking task, you should use the spawn_blocking function.


    请注意,我已经链接到 Tokio 0.2 的文档,因为 warp 还不支持 Tokio 0.3。

    如果在第一次调用完成之前多次调用函数,为了防止多次计算值,您可以使用基于 HashMap 的技术。存储在这样的互斥锁后面:
    use std::collections::HashMap;
    use std::sync::Mutex;
    use tokio::sync::broadcast;

    pub struct Cache {
    inner: Mutex<Inner>,
    }
    struct Inner {
    cached: HashMap<String, CachedType>,
    pending: HashMap<String, broadcast::Sender<CachedType>>,
    }

    pub enum TryCached {
    Exists(CachedType),
    Pending(broadcast::Receiver<CachedType>),
    New(),
    }

    impl Cache {
    pub fn try_get(&self, key: &str) -> TryCached {
    let mut inner = self.inner.lock().unwrap();
    if let Some(value) = inner.cached.get(key) {
    // To avoid clone, use HashMap<String, Arc<CachedType>> and clone anyway.
    TryCached::Exists(value.clone())
    } else if let Some(pending) = inner.pending.get(key) {
    TryCached::Pending(pending.subscribe())
    } else {
    let (channel, _) = broadcast::channel(1);
    inner.pending.insert(key.to_string(), channel);
    TryCached::New()
    }
    }
    pub fn put_computed(&self, key: String, value: CachedType) {
    let mut inner = self.inner.lock().unwrap();
    if let Some(chan) = inner.pending.remove(&key) {
    chan.send(value.clone());
    }
    inner.cached.insert(key, value);
    }
    }
    然后可以将该方法实现为对 try_get 的调用。根据返回的枚举的值做不同的事情。
    pub async fn segment_handler(cache: &Cache, segment: String) -> CachedType {
    match cache.try_get(&segment) {
    TryCached::Exists(value) => value,
    TryCached::Pending(mut chan) => chan.recv().await.expect("Sender dropped without sending"),
    TryCached::New() => {
    let (segment, value) = tokio::task::spawn_blocking(move || {
    // Do some computation to get the result.
    let result = do_some_large_computation(&segment);
    // Cache this result to a file.
    let file_name = &format!("./cache/{}", &segment);
    std::fs::write(file_name, result.to_slice()).expect("Unable to write file");
    (segment, result)
    })
    .await
    .expect("Panic in spawn_blocking");

    cache.put_computed(segment, value.clone());
    value
    }
    }
    }
    完整的例子可以在 the playground 上找到。 .
    由于互斥体,此方法是完全线程安全的。请注意,这使用同步互斥锁而不是异步互斥锁。要了解更多关于为什么这没问题,请参阅 the shared state chapter来自 Tokio 教程。

    关于concurrency - 如何在 rust 中执行线程安全 IO 和缓存到文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64735270/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com