gpt4 book ai didi

asynchronous - rust future ::选择循环中修改局部变量

转载 作者:行者123 更新时间:2023-12-03 11:46:46 27 4
gpt4 key购买 nike

我有一个这样的结构G

struct G { /* some member */ }
impl G {
async fn ref_foo(&self) { /* some code uses G's member */ }
async fn mut_foo(&mut self) { /* some code modifies G's member */ }
}
负责处理来自 mpsc::Receiver的请求,如下所示-无法编译:
async fn run_loop(mut rx: impl Stream<Item = Task> + FusedStream + Unpin) {
let mut g = G {};
let mut QS = FuturesUnordered::new();
loop {
select! {
t = rx.select_next_some() => match t {
TR {..} => QS.push(g.ref_foo()),
TM {..} => QS.push(g.mut_foo()),
},
_ = QS.select_next_some() => {},
}
}
}
上面的代码由于对 g的多个可变引用而无法编译。
目标:
我想要的是,该循环针对任意数量的 ref_foo任务并行运行,并且当它需要运行 mut_foo任务时,它会等待直到每个 ref_foo任务完成,然后运行 mut_foo任务,然后它才能照常运行其他任务。
                / g.ref_foo() \                                    / ...
| g.ref_foo() | | ...
g.mut_foo() => < g.ref_foo() > => g.mut_foo() => g.mut_foo() => < ...
| g.ref_foo() | | ...
\ g.ref_foo() / \ ...
附加信息:
我曾经将 mut_foo的实现移到select循环,并删除了 async上的 g.mut_foo(),以便在流QS中不使用任何可变引用。
但是这种实现确实很麻烦,并且无疑破坏了G的设计。

刚才,我提出了一个包装器来实现另一个实现:
async fn run_task(mut g: G, t: Task) -> G {
match t {
TR {..} => g.ref_foo().await,
TM {..} => g.mut_foo().await,
};
g
}
在选择循环中:
async fn run_loop(mut rx: impl Stream<Item = Task> + FusedStream + Unpin) {
let g0 = G {};
let mut QS = FuturesUnordered::new();
let mut getter = FuturesUnordered::new();
getter.push(ready(g0));
loop {
select! {
t = rx.select_next_some() => {
let mut g = getter.select_next_some().await;
QS.push(run_task(g, t));
},
mut g = QS.select_next_some() => getter.push(ready(g)),
}
}
}
这个可以编译,但是它可能不是那么“异步”。在此实现中, ref_foo任务也按顺序运行。
问题:
  • 我应该学习更多的 Material 来解决这个问题吗?我使用的技术来自 rust-async-book
  • 我必须使用RefCell来解决此问题吗?恕我直言,这应该是一个微不足道的问题,可以解决而不破坏rust的借用规则(通过使用RefCell)。
  • 我可以更改我的自动换行run_task和select循环,以便ref_foo并行运行吗?我在实现中遇到问题,因为G正在流动getter => QS => getter => ...,没有长期的G实例,而且我不知道可以在哪里存储它。

  • 附加我的一些想法:
    由于 mut_foo不能并行运行,因此我试图通过删除 async上的 mut_foo关键字来解决此问题-进展不大。核心问题是 ref_foo的并行运行需要G的不可变ref,但是当需要 mut_foo时,我必须摆脱所有这些不可变的ref G。无论 mut_foo是否异步(或“ mut_foo是否返回ref G”),事实都不会改变。

    最佳答案

    我已经用很多if语句解决了问题3。我希望有一些更优雅的实现。而且,我非常感谢问题1中所述的任何学习 Material 。
    这是完整的代码(简化):

    use tokio::runtime;
    use std::thread;
    use std::time::Duration;
    use futures::{
    select, StreamExt, SinkExt,
    future::{ready},
    stream::{FusedStream, FuturesUnordered, Stream},
    };

    struct G;

    impl G {
    async fn ref_foo(&self) { println!("ref_foo +++"); tokio::time::sleep(Duration::from_millis(500)).await; println!("ref_foo ---"); }
    async fn mut_foo(&mut self) { println!("mut_foo +++"); tokio::time::sleep(Duration::from_millis(500)).await; println!("mut_foo ---"); }
    }

    #[derive(Clone)]
    enum Task {
    TR,
    TM,
    }

    // wrappers
    async fn run_ref_task(g: &G, task: Task) {
    match task {
    Task::TR => g.ref_foo().await,
    _ => {},
    };
    }

    async fn run_mut_task(mut g: G, task: Task) -> G {
    match task {
    Task::TM => g.mut_foo().await,
    _ => {},
    };
    g
    }

    async fn run_loop(mut rx: impl Stream<Item = Task> + FusedStream + Unpin) {
    let g0 = G;

    let mut getter = FuturesUnordered::new();
    getter.push(ready(g0));
    // the following streams stores only `ready(task)`
    let mut mut_tasks = FuturesUnordered::new(); // for tasks that's scheduled in this loop
    let mut ref_tasks = FuturesUnordered::new();
    let mut mut_delay = FuturesUnordered::new(); // for tasks that's scheduled in next loop
    let mut ref_delay = FuturesUnordered::new();

    loop {
    println!("============ avoid idle loops ============");
    let g = getter.select_next_some().await;
    {
    let mut queries = FuturesUnordered::new(); // where we schedule ref_foo tasks
    loop {
    println!("------------ avoid idle ref_task loops ------------");
    select! {
    task = rx.select_next_some() => {
    match &task {
    Task::TR => ref_delay.push(ready(task)),
    Task::TM => mut_tasks.push(ready(task)),
    };
    if mut_delay.is_empty() && ref_tasks.is_empty() && queries.is_empty() { break; }
    },
    task = mut_delay.select_next_some() => {
    mut_tasks.push(ready(task));
    if mut_delay.is_empty() && ref_tasks.is_empty() && queries.is_empty() { break; }
    }
    task = ref_tasks.select_next_some() => {
    queries.push(run_ref_task(&g, task));
    }
    _ = queries.select_next_some() => {
    if mut_delay.is_empty() && ref_tasks.is_empty() && queries.is_empty() { break; }
    },
    }
    }
    }
    getter.push(ready(g));

    {
    let mut queries = FuturesUnordered::new(); // where we schedule mut_foo tasks
    loop {
    println!("------------ avoid idle mut_task loops ------------");
    select! {
    task = rx.select_next_some() => {
    match &task {
    Task::TR => ref_tasks.push(ready(task)),
    Task::TM => mut_delay.push(ready(task)),
    };
    if ref_delay.is_empty() && mut_tasks.is_empty() && queries.is_empty() { break; }
    },
    task = ref_delay.select_next_some() => {
    ref_tasks.push(ready(task));
    if ref_delay.is_empty() && mut_tasks.is_empty() && queries.is_empty() { break; }
    }
    g = getter.select_next_some() => {
    if let Some(task) = mut_tasks.next().await {
    queries.push(run_mut_task(g, task));
    } else {
    getter.push(ready(g));
    if ref_delay.is_empty() && queries.is_empty() { break; }
    }
    }
    g = queries.select_next_some() => {
    getter.push(ready(g));
    if ref_delay.is_empty() && mut_tasks.is_empty() && queries.is_empty() { break; }
    }
    }
    }
    }
    }
    }

    fn main() {
    let (mut tx, rx) = futures::channel::mpsc::channel(10000);
    let th = thread::spawn(move || thread_main(rx));
    let tasks = vec![Task::TR, Task::TR, Task::TM, Task::TM, Task::TR, Task::TR, Task::TR, Task::TM, Task::TM];

    let rt = runtime::Builder::new_multi_thread().enable_time().build().unwrap();
    rt.block_on(async {
    loop {
    for task in tasks.clone() {
    tx.send(task).await.expect("");
    }
    tokio::time::sleep(Duration::from_secs(10)).await;
    }
    });
    th.join().expect("");
    }

    fn thread_main(rx: futures::channel::mpsc::Receiver<Task>) {
    let rt = runtime::Builder::new_multi_thread().enable_time().build().unwrap();
    rt.block_on(async {
    run_loop(rx).await;
    });
    }

    关于asynchronous - rust future ::选择循环中修改局部变量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66197522/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com