gpt4 book ai didi

rust - 发送到数组中的每个 futures::sync::mpsc::Sender

转载 作者:行者123 更新时间:2023-11-29 08:18:14 26 4
gpt4 key购买 nike

我有一个 futures::sync::mpsc::Sender 的动态集合,我想为每个传入连接发送一条消息。

我让它与 UnboundedSender 一起工作,因为我只能这样做(见下文)但是 Sender 会消耗自身,所以我需要将其删除并重新插入到 Vec 发送后。我怎样才能做到这一点?如果 Sender 阻塞,它不应该发送更多消息,而是切换到处理接收方的传入连接。

UnboundedSender 实现在下面,我尝试这样做的失败被内联注释掉了(只需用注释掉的行替换前面的行)。

UnboundedSender(有效)

extern crate tokio;
use tokio::runtime::current_thread;

extern crate futures;
use futures::{stream, Stream, Sink};
use futures::sync::mpsc;

fn main() {
let values = vec![1 as i8, 2, 0, 1, 2, 3, 0, 1, 2, 3, -1];
let mut senders = Vec::<mpsc::UnboundedSender<i8>>::new();
let stream = stream::iter_ok::<_, ()>(values)
.for_each(|v| {
match v {
0 => {
println!("Adding channel");
let (sender, receiver) = mpsc::unbounded();
senders.push(sender);
current_thread::spawn(receiver.for_each(|v| {
println!("Received {}", v);
Ok(())
}))

},
-1 => {
println!("Closing channels");
senders.clear();
},
x => {
for sender in senders.iter() {
println!("Sending {}", x);
sender.unbounded_send(x).unwrap();
}
},
}
Ok(())
});

current_thread::block_on_all(stream)
.expect("Failed to run stream");
println!("Done!");
}

发件人(不起作用)

extern crate tokio;
use tokio::runtime::current_thread;

extern crate futures;
use futures::{stream, Stream, Sink};
use futures::sync::mpsc;

fn main() {
let values = vec![1 as i8, 2, 0, 1, 2, 3, 0, 1, 2, 3, -1];
let mut senders = Vec::<mpsc::Sender<i8>>::new();
let stream = stream::iter_ok::<_, ()>(values)
.for_each(|v| {
match v {
0 => {
println!("Adding channel");
let (sender, receiver) = mpsc::channel(1);
senders.push(sender);
current_thread::spawn(receiver.for_each(|v| {
println!("Received {}", v);
Ok(())
}))

},
-1 => {
println!("Closing channels");
senders.clear();
},
x => {
for sender in senders.iter() {
println!("Sending {}", x);
sender.send(x);
//^error[E0507]: cannot move out of borrowed content
}
},
}
Ok(())
});

current_thread::block_on_all(stream)
.expect("Failed to run stream");
println!("Done!");
}

最佳答案

AFAIK,你有两个主要问题,send() 取得了 Sender 的所有权,所以如果你想稍后重用它,你必须克隆到某个地方并且它返回一个你必须以某种方式处理的 future 。

有多种方法可以解决这些问题,这里是一种:

extern crate futures;
extern crate tokio;

use futures::sync::mpsc;
use futures::Future;
use futures::{stream, Sink, Stream};

fn main() {
let values = vec![1i8, 2, 0, 1, 2, 3, 0, 1, 2, 3, -1]; // remove cast syntax
let mut senders = vec![]; // remove annotations
let stream = stream::iter_ok(values).for_each(move |v| { // move senders
match v {
0 => {
println!("Adding channel");
let (sender, receiver) = mpsc::channel(1);
senders.push(sender);
tokio::spawn(receiver.for_each(|v| {
println!("Received {}", v);
Ok(())
}));
}
-1 => {
println!("Closing channels");
senders.clear();
}
x => {
for sender in senders.iter() {
let send = sender
.clone() // clone sender
.send(x)
.map(move |_| println!("Sending {}", x))
.map_err(|e| eprintln!("error = {:?}", e));
tokio::spawn(send); // spawn the task
}
}
}
Ok(())
});

tokio::run(stream);
println!("Done!");
}

关于rust - 发送到数组中的每个 futures::sync::mpsc::Sender,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53557609/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com