gpt4 book ai didi

rust - 使用 Tokio 的 mpsc 和 oneshot 会导致死锁

转载 作者:行者123 更新时间:2023-11-29 08:32:56 25 4
gpt4 key购买 nike

我想编写一个 SOCKS 服务器,它根据客户端请求的目的地选择多个 Internet 网关之一。大体流程是

  1. 执行SOCKS5协商并从客户端获取地址信息
  2. 请求内部服务器选择互联网网关和目标 IP
  3. 联系并进行沟通

对于这个内部服务器,生成了一个 Tokio 任务,它在 mpsc 队列上等待。收到的消息应包含 SOCKS5 地址信息和单发 channel 的 tx 端以返回结果。

另一个 Tokio 任务只是定期查询内部服务器:

extern crate futures;
extern crate tokio_core;
extern crate tokio_timer;

use std::time;
use std::time::{Duration, Instant};
use std::fmt::Debug;
use tokio_core::reactor::{Core, Interval};
use tokio_timer::wheel;
use futures::{Future, Sink, Stream};
use futures::sync::{mpsc, oneshot};

type MsgRequest<A, E> = oneshot::Sender<Result<A, E>>;
type FutRequest<A, E> = mpsc::Sender<MsgRequest<A, E>>;

#[derive(Debug)]
struct Responder<A, E> {
fut_tx: FutRequest<A, E>,
}

impl<A: 'static, E: 'static> Responder<A, E>
where
E: Debug,
{
fn query(&self) -> Result<A, E> {
println!("enter query");
let (res_tx, res_rx) = oneshot::channel::<Result<A, E>>();
println!("send query");
let fut_tx = self.fut_tx.clone();
let res = fut_tx
.send(res_tx)
.then(|tx| {
if let Ok(_tx) = tx {
println!("Sink flushed");
}
res_rx
})
.and_then(|x| Ok(x))
.wait()
.unwrap();
res
}
}

impl<A: 'static, E: 'static> Clone for Responder<A, E> {
fn clone(&self) -> Self {
Responder {
fut_tx: self.fut_tx.clone(),
}
}
}

fn resolve(tx: oneshot::Sender<Result<u8, String>>) -> Result<(), ()> {
println!("resolve");
let delay = time::Duration::from_secs(10);
wheel()
.build()
.sleep(delay)
.then(|_| tx.send(Ok(0)))
.wait()
.unwrap();
println!("resolve answered");
Ok(())
}

fn main() {
let mut lp = Core::new().unwrap();
let handle = lp.handle();

let (fut_tx, fut_rx) = mpsc::channel::<MsgRequest<u8, String>>(100);
let resolver = fut_rx.for_each(|msg| resolve(msg));
handle.spawn(resolver);

let responder = Responder { fut_tx };

let server = Interval::new_at(Instant::now(), Duration::new(2, 0), &handle)
.unwrap()
.for_each(move |_| {
println!("Call query for_each");
let rx = responder.clone();
let _res = rx.query();
Ok(())
})
.map_err(|_| ());
handle.spawn(server);

loop {
lp.turn(None);
}
}

使用 Cargo.toml 依赖项:

[dependencies]
futures = "0.1"
tokio-core = "0.1"
tokio-timer = "0.1"

这段代码死锁了。输出是:

Call query for_each
enter query
send query
Sink flushed

预期输出是:

Call query for_each
enter query
send query
Sink flushed
resolve
resolve answered
Call query for_each
enter query
send query
Sink flushed
resolve
resolve answered
....

表示以tx结尾的请求已经成功发送到内部服务器,但内部服务器没有处理。根据我的理解,mpsc 和 oneshot 可用于在任务之间传输,而不仅仅是线程,因此包含线程不应像它那样死锁。

这里有什么问题吗?

最佳答案

阅读后Aaron's blog , future 的概念现在更加清晰了。我的第一种方法不是需求驱动的,因此是不充分的。函数 resolve() 实际上应该返回 future 而不是结果。

为了适本地结束这个问题,这里是我修改的、进一步简化的最小示例来展示这个概念:

extern crate futures;
extern crate tokio_core;
extern crate tokio_timer;

use std::time;
use std::time::{Instant,Duration};
use tokio_core::reactor::{Core, Interval};
use tokio_timer::wheel;
use futures::{Future,Stream,Sink};
use futures::sync::{oneshot,mpsc};

type MsgRequest<A,E> = oneshot::Sender<Result<A,E>>;

fn main() {
let mut lp = Core::new().unwrap();
let handle = lp.handle();

let (fut_tx, fut_rx) = mpsc::channel::<MsgRequest<u8,String>>(100);
let handle2 = handle.clone();
let resolver = fut_rx.and_then(move |tx| {
println!("Got query...wait a bit");
let delay = time::Duration::from_secs(5);
handle2.spawn({
wheel().build().sleep(delay)
.then(move |_|{
println!("Answer query");
tx.send(Ok(0)).unwrap();
println!("query answered");
Ok(())
})
});
Ok(())
})
.for_each(|_| {Ok(())});
handle.spawn(resolver);

let server = Interval::new_at(Instant::now(),
Duration::new(2,0),&handle).unwrap()
.then(move |_| {
let fut_tx = fut_tx.clone();
let (res_tx, res_rx) = oneshot::channel::<Result<u8,String>>();
println!("send query");
fut_tx.send(res_tx)
.then( |tx|{
if let Ok(_tx) = tx { println!("Sink flushed"); }
res_rx
})
})
.for_each(|res| {
println!("Received result {:?}",res);
Ok(())
}).map_err(|_| ());
handle.spawn(server);

loop {
lp.turn(None);
}
}

它按预期输出:

send query
Sink flushed
Got query...wait a bit
Answer query
query answered
Received result Ok(0)
send query
Sink flushed
Got query...wait a bit
Answer query
query answered
Received result Ok(0)
...

关于rust - 使用 Tokio 的 mpsc 和 oneshot 会导致死锁,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47929646/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com