gpt4 book ai didi

multithreading - 从 Rust 的 channel 迭代器中获取第一个接收到的值

转载 作者:行者123 更新时间:2023-12-03 11:30:29 25 4
gpt4 key购买 nike

我有一个 futures::channel::mpsc::UnboundedReceiver<T> 的迭代器.我想处理接收者的每一个答案,一次只处理一个,但同时还要处理其他 future 。

这应该可以通过循环 futures::select! 来实现。但我需要某种方法从 UnboundReceiver<T> 中获取已解析的值.我尝试使用 futures::future::select_all(Iter) ,但这无法编译并出现错误:futures::channel::mpsc::UnboundedReceiver<T> is not a future .

Playground 示例是 here .

最佳答案

futures::channel::mpsc::UnboundedReceiver 实现了 Stream 但不是 future,因此您可以通过调用 futures::创建一个 SelectAll stream::select_all(recv),然后通过调用 select_all.next() 解析到下一条就绪消息。我通过使用它改编了你的例子:

use futures::{channel::mpsc, stream::{self, StreamExt, select_all}}; // 0.3.8
use tokio; // 1.0.1

#[tokio::main]
async fn main() -> failure::Fallible<()> {
let mut recv = Vec::new();
let mut futures = stream::FuturesUnordered::new();
for _i in 0..3 {
let (tx, rx) = mpsc::unbounded();
recv.push(rx);
futures.push(tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(3000)).await;
tx.unbounded_send("Message").unwrap();
}));
}
let mut select_all = select_all(recv);
loop {
futures::select! {
msg = select_all.next() => {
println!("{:#?}", msg);
}
_ = futures.select_next_some() => {
eprintln!("Thread died");
},
complete => break
}
}
Ok(())
}

请注意,这不是多线程而是异步编程,您生成异步的 tokio 任务而不是线程。我建议在这里阅读答案:What is the difference between asynchronous programming and multithreading?

关于multithreading - 从 Rust 的 channel 迭代器中获取第一个接收到的值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65631020/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com