- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我想编写一个 SOCKS 服务器,它根据客户端请求的目的地选择多个 Internet 网关之一。大体流程是
对于这个内部服务器,生成了一个 Tokio 任务,它在 mpsc 队列上等待。收到的消息应包含 SOCKS5 地址信息和单发 channel 的 tx 端以返回结果。
另一个 Tokio 任务只是定期查询内部服务器:
extern crate futures;
extern crate tokio_core;
extern crate tokio_timer;
use std::time;
use std::time::{Duration, Instant};
use std::fmt::Debug;
use tokio_core::reactor::{Core, Interval};
use tokio_timer::wheel;
use futures::{Future, Sink, Stream};
use futures::sync::{mpsc, oneshot};
type MsgRequest<A, E> = oneshot::Sender<Result<A, E>>;
type FutRequest<A, E> = mpsc::Sender<MsgRequest<A, E>>;
#[derive(Debug)]
struct Responder<A, E> {
fut_tx: FutRequest<A, E>,
}
impl<A: 'static, E: 'static> Responder<A, E>
where
E: Debug,
{
fn query(&self) -> Result<A, E> {
println!("enter query");
let (res_tx, res_rx) = oneshot::channel::<Result<A, E>>();
println!("send query");
let fut_tx = self.fut_tx.clone();
let res = fut_tx
.send(res_tx)
.then(|tx| {
if let Ok(_tx) = tx {
println!("Sink flushed");
}
res_rx
})
.and_then(|x| Ok(x))
.wait()
.unwrap();
res
}
}
impl<A: 'static, E: 'static> Clone for Responder<A, E> {
fn clone(&self) -> Self {
Responder {
fut_tx: self.fut_tx.clone(),
}
}
}
fn resolve(tx: oneshot::Sender<Result<u8, String>>) -> Result<(), ()> {
println!("resolve");
let delay = time::Duration::from_secs(10);
wheel()
.build()
.sleep(delay)
.then(|_| tx.send(Ok(0)))
.wait()
.unwrap();
println!("resolve answered");
Ok(())
}
fn main() {
let mut lp = Core::new().unwrap();
let handle = lp.handle();
let (fut_tx, fut_rx) = mpsc::channel::<MsgRequest<u8, String>>(100);
let resolver = fut_rx.for_each(|msg| resolve(msg));
handle.spawn(resolver);
let responder = Responder { fut_tx };
let server = Interval::new_at(Instant::now(), Duration::new(2, 0), &handle)
.unwrap()
.for_each(move |_| {
println!("Call query for_each");
let rx = responder.clone();
let _res = rx.query();
Ok(())
})
.map_err(|_| ());
handle.spawn(server);
loop {
lp.turn(None);
}
}
使用 Cargo.toml 依赖项:
[dependencies]
futures = "0.1"
tokio-core = "0.1"
tokio-timer = "0.1"
这段代码死锁了。输出是:
Call query for_each
enter query
send query
Sink flushed
预期输出是:
Call query for_each
enter query
send query
Sink flushed
resolve
resolve answered
Call query for_each
enter query
send query
Sink flushed
resolve
resolve answered
....
表示以tx
结尾的请求已经成功发送到内部服务器,但内部服务器没有处理。根据我的理解,mpsc 和 oneshot 可用于在任务之间传输,而不仅仅是线程,因此包含线程不应像它那样死锁。
这里有什么问题吗?
最佳答案
阅读后Aaron's blog , future 的概念现在更加清晰了。我的第一种方法不是需求驱动的,因此是不充分的。函数 resolve() 实际上应该返回 future 而不是结果。
为了适本地结束这个问题,这里是我修改的、进一步简化的最小示例来展示这个概念:
extern crate futures;
extern crate tokio_core;
extern crate tokio_timer;
use std::time;
use std::time::{Instant,Duration};
use tokio_core::reactor::{Core, Interval};
use tokio_timer::wheel;
use futures::{Future,Stream,Sink};
use futures::sync::{oneshot,mpsc};
type MsgRequest<A,E> = oneshot::Sender<Result<A,E>>;
fn main() {
let mut lp = Core::new().unwrap();
let handle = lp.handle();
let (fut_tx, fut_rx) = mpsc::channel::<MsgRequest<u8,String>>(100);
let handle2 = handle.clone();
let resolver = fut_rx.and_then(move |tx| {
println!("Got query...wait a bit");
let delay = time::Duration::from_secs(5);
handle2.spawn({
wheel().build().sleep(delay)
.then(move |_|{
println!("Answer query");
tx.send(Ok(0)).unwrap();
println!("query answered");
Ok(())
})
});
Ok(())
})
.for_each(|_| {Ok(())});
handle.spawn(resolver);
let server = Interval::new_at(Instant::now(),
Duration::new(2,0),&handle).unwrap()
.then(move |_| {
let fut_tx = fut_tx.clone();
let (res_tx, res_rx) = oneshot::channel::<Result<u8,String>>();
println!("send query");
fut_tx.send(res_tx)
.then( |tx|{
if let Ok(_tx) = tx { println!("Sink flushed"); }
res_rx
})
})
.for_each(|res| {
println!("Received result {:?}",res);
Ok(())
}).map_err(|_| ());
handle.spawn(server);
loop {
lp.turn(None);
}
}
它按预期输出:
send query
Sink flushed
Got query...wait a bit
Answer query
query answered
Received result Ok(0)
send query
Sink flushed
Got query...wait a bit
Answer query
query answered
Received result Ok(0)
...
关于rust - 使用 Tokio 的 mpsc 和 oneshot 会导致死锁,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47929646/
我正在尝试实现一个基于数组的环形缓冲区,它对多个生产者和单个消费者是线程安全的。主要思想是拥有原子头和尾索引。将元素插入队列时,头部会自动增加以在缓冲区中保留一个槽: #include #inclu
代码如下: use std::thread; use std::sync::mpsc; fn main() { //spawn threads let (tx, rx) = mpsc:
我有一个循环,我在其中做一些工作,并使用Sender发送结果。工作需要时间,如果失败,我需要重试。重试时,接收器可能已关闭,重试可能会浪费时间。因此,我需要一种无需发送消息即可检查Receiver是否
在我的代码片段中,tokio (v0.3) mpsc:channel接收器仅在缓冲区已满时接收消息。缓冲区的大小无关紧要。 use std::io; use std::net::{SocketAddr
我有一个包含 std::sync::mpsc::Receiver 的集合并希望根据其中的消息将它们分类到其他集合中。 我正在迭代接收器,测试它们是否收到消息,如果是,我测试消息的性质,并根据它,我将接
我想生成 n 个线程,它们能够与环形拓扑中的其他线程进行通信,例如线程 0 可以向线程 1 发送消息,线程 1 向线程 2 发送消息,以此类推,线程 n 向线程 0 发送消息。 这是我想用 n=3 实
我有一个闭包,它使用 std::sync::mpsc 中的 Sender: let node = Arc::new(Mutex::new(node_sender)); let switch_callb
我正在尝试创建一个异步 Rust 客户端,它在一端连接到 WebSocket 服务器,在另一端连接到蓝牙加密狗。在这两者之间会有一些过滤消息的逻辑。 我正在使用 rust-websocket用于 We
我正在尝试连续读取 Receiver在指定的持续时间内。我想出了以下解决方案 pub fn get( rx: &Receiver, get_duration: time::Durati
impl A { fn new() -> (A, std::sync::mpsc::Receiver) { let (sender, receiver) = std::sync
我认为 channel 的全部目的是在线程之间共享数据。我有这个代码,based on this example : let tx_thread = tx.clone(); let ctx = sel
我正在尝试处理 TcpStream在 Rust 程序(一个 tcp 服务器)中使用线程。我想在 HashMap 中跟踪当前客户端连接使用 Arc> . 当一个线程完成时,要从 HashMap 中删除连
我正在用 Rust 编写一个小游戏来学习多线程。我得到的代码包含两个循环,一个是逻辑循环,一个是渲染循环,如下所示: let (t1_entity_in, t1_entity_out) = mpsc:
无论我运行程序多少次,它总是以相同的顺序显示数字: use std::sync::mpsc::channel; use std::thread; fn main() { let (tx, rx
我想用 sdl2-rs crate 启动一个计时器来执行绘制调用。我想通过做这样的事情来开始它: extern crate sdl2; use std::sync::mpsc; enum Event
我有一个应用程序可以在回调中从队列中提取项目。回调在一个单独的线程中,所以我不能直接在回调中执行任何操作。队列中的项目包括一个 URL 和一个要 POST 到 URL 的消息体。根据响应,我从队列中删
我有这样一种情况,我正在设置具有多个发送器的 channel ,这些发送器需要能够将不同类型的数据发送到接收线程。 使用以下匹配表达式创建一个接收线程来处理这些消息。 let receiver_thr
我有一个 futures::sync::mpsc::Sender 的动态集合,我想为每个传入连接发送一条消息。 我让它与 UnboundedSender 一起工作,因为我只能这样做(见下文)但是 Se
我正在阅读 futures-preview 0.3 源代码以了解如何正确地“通知任何”。在 mpsc::channel(有界)中,多个发送者可能会等待接收(在缓冲区已满的情况下)。 研究 next_m
我有一个 futures::sync::mpsc::unbounded channel 。我可以发送消息到 UnboundedSender但从 UnboundedReciever 接收它们时遇到问题.
我是一名优秀的程序员,十分优秀!