gpt4 book ai didi

concurrency - 防止并发执行

转载 作者:行者123 更新时间:2023-12-03 11:30:51 24 4
gpt4 key购买 nike

我想防止并发执行异步调用的函数。
从 super 服务调用该函数,并且两个连接应导致一个等待,直到另一个函数调用完成。我认为实现Future来阻止执行,直到完成其他线程/连接都可以解决该问题。遇到我的问题,我将 future 存储在Mutex<HashMap<i64, LockFut>>中,但是当我锁定互斥锁以获取并等待LockFut时,它显然会提示MutexGuard无法发送。我不知道如何解决此问题,或者我的方法很糟糕。

    |
132 | let mut locks = LOCKS.lock().unwrap();
| --------- has type `std::sync::MutexGuard<'_, std::collections::HashMap<i64, hoster::hoster::LockFut>>`
...
136 | lock.await;
| ^^^^^^^^^^ await occurs here, with `mut locks` maybe used later
137 | }
| - `mut locks` is later dropped here
这是我 future 的实现
lazy_static! {
static ref LOCKS: Mutex<HashMap<i64, LockFut>> = Mutex::new(HashMap::new());
}

struct LockState {
waker: Option<Waker>,
locked: bool
}

struct LockFut {
state: Arc<Mutex<LockState>>
}

impl Future for LockFut {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut state = self.state.lock().unwrap();
match state.locked {
false => {
Poll::Ready(())
},
true => {
state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
}

impl LockFut {
fn new() -> LockFut {
LockFut {
state: Arc::new(Mutex::new(LockState {
locked: false,
waker: None
}))
}
}

pub fn release_lock(&mut self) {
let mut state = self.state.lock().unwrap();
state.locked = false;
if let Some(waker) = state.waker.take() {
waker.wake();
}
}

pub async fn lock<'a>(id: i64) {
let mut locks = LOCKS.lock().unwrap();
// Wait for existing lock to be unlocked or create a new lock
let lock = locks.entry(id).or_insert(LockFut::new());
// Wait for the potential lock to be released
lock.await;
}

pub fn unlock(id: i64) {
match LOCKS.lock().unwrap().get_mut(&id) {
Some(lock) => lock.release_lock(),
None => warn!("No lock found for: {}", id)
};
}
}
这就是我所说的
async fn is_concurrent(id: i64) {
should_not_be_concurrent().await;
}

async fn should_not_be_concurrent(id: i64) {
LockFut::lock(id).await;
// Do crazy stuff
LockFut::unlock(id);
}

最佳答案

标准Mutex的保护确实是!Send,因此不能在await -s之间携带。对于该任务,通常要考虑使用异步互斥锁。 futures 中有一个,还有stand-alone crate中。他们的 guard 是Send,此时应解决此问题。
但是我想进一步说LockFut解决了与异步Mutex完全相同的问题。因此,对于此特定示例代码,可以将其显着简化为以下代码(playground):

use std::sync::Mutex as StdMutex;
use futures::lock::Mutex;

#[derive(Default)]
struct State { .. }

type SharedState = Arc<Mutex<State>>;

lazy_static! {
static ref LOCKS: StdMutex<HashMap<i64, SharedState>> = Default::default();
}

fn acquire_state<'a>(id: i64) -> SharedState {
Arc::clone(&LOCKS.lock().unwrap().entry(id).or_default())
}


// Acquiring is straightforward:
let mut state = acquire_state(0).lock().await;


// or with your functions:
async fn is_concurrent(id: i64) {
should_not_be_concurrent(id).await;
}

async fn should_not_be_concurrent(id: i64) {
let mut state = acquire_state(id).lock().await;
// Do crazy stuff

// As a bonus there's no need in manual unlocking here
// since `drop(state)` unlocks the mutex.
}
另外,您可能会发现有关异步代码中互斥锁的有用的 this博客文章。

关于concurrency - 防止并发执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62745891/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com