gpt4 book ai didi

callback - 使用 Tokio 处理 TCP 连接时使用回调

转载 作者:行者123 更新时间:2023-11-29 08:29:12 25 4
gpt4 key购买 nike

我正在尝试拥有一个启动事件循环、监听 TCP 连接并为每个连接调用回调的结构。

(回调将从套接字传递一些预先拥有的数据。在我下面的示例中,我只是将连接的 IP 地址传递给它,但在我的实际代码中,我将解析我通过 serde 接收到的内容到一个结构中并将其传递到回调中。我希望这不会使以下“无效示例”)无效。

我的 Cargo.toml :

[package]
name = "lifetime-problem"
version = "0.1.0"
edition = "2018"
[dependencies]
tokio-tcp = "0.1.3"
tokio = "0.1.14"
[[bin]]
name = "lifetime-problem"
path = "main.rs"

main.rs :

use tokio::prelude::*;

struct Test {
printer: Option<Box<Fn(std::net::SocketAddr) + Sync>>,
}

impl Test {
pub fn start(&mut self) -> Result<(), Box<std::error::Error>> {
let addr = "127.0.0.1:4242".parse::<std::net::SocketAddr>()?;
let listener = tokio::net::TcpListener::bind(&addr)?;
let server = listener
.incoming()
.map_err(|e| eprintln!("failed to accept socket; error = {:?}", e))
.for_each(move |socket: tokio::net::TcpStream| {
let address = socket.peer_addr().expect("");
match self.printer {
Some(callback) => { callback(address); }
None => { println!("{}", address); }
}
Ok(())
});
tokio::run(server);
Ok(())
}
}

fn main() {
let mut x = Test{ printer: None };
x.start();
}

我从这段代码开始尝试了几件事(直接从 Tokio example 中采用)。

  1. 如果我使用上面发布的代码,我会得到:

    error[E0277]: (dyn std::ops::Fn(std::net::SocketAddr) + std::marker::Sync + 'static) cannot be sent between threads safely

    第 24 行(tokio::run(server))。

  2. 如果我添加 Send Fn 上的特征在打印机字段中 XOR 如果我删除 move在闭包中 for_each调用我得到另一个错误:

    error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements

    这让我想到了显然不能比 start 更长久的闭包定义它的方法但是 tokio::run似乎对它有相互矛盾的要求。

您知道我是否以完全错误的方式处理回调模式,或者我的代码中是否存在一些小错误?

最佳答案

要事第一:

编译器将翻译Box<Fn(std::net::SocketAddr) + Sync>Box<Fn(std::net::SocketAddr) + Sync + 'static>除非明确指定生命周期。

让我们看一下错误:

error[E0277]: (dyn std::ops::Fn(std::net::SocketAddr) + std::marker::Sync + 'static) cannot be sent between threads safely

这是不言自明的。您正在尝试移动 &mut T到另一个线程,但不能,因为 T这里不是Send .发&mut T到另一个线程T也需要是 Send 类型.

这是会产生相同错误的最少代码:

use std::fmt::Debug;

fn func<T> (i:&'static mut T) where T: Debug {
std::thread::spawn(move || {
println!("{:?}", i);
});
}

如果我做 T以上也是 Send 类型,错误消失。但是在您添加 Send 的情况下特质,它给出了生命周期错误。为什么?

&mut self有比函数 start() 更长的生命周期由调用者设置,但不能保证它的 'static .您将此引用移动到传递给线程的闭包中,并且可能会超过它正在关闭的范围,从而导致悬空引用。

这是一个最小版本,会给出同样的错误。

use std::fmt::Debug;

fn func<'a, T:'a> (i:&'a mut T) where T: Debug + Sync + Send {
std::thread::spawn(move || {
println!("{:?}", i);
});
}

Sync这里并不是真正需要的,因为它是 &mut T .改变 &mut T&T (保留 Sync ),也会导致同样的错误。这里的责任在于引用而不是可变性。所以你看,有一些生命周期'a并且它被移动到一个闭包中(给一个线程),这意味着闭包现在包含一个引用(与主上下文不相交)。那么现在,'a 是什么?从另一个线程调用的闭包的角度来看,它将存在多长时间?不可推论!结果,编译器提示说 cannot infer an appropriate lifetime due to conflicting requirements .

如果我们稍微调整一下代码;

impl Test {
pub fn start(&'static mut self) -> Result<(), Box<std::error::Error>> {
let addr = "127.0.0.1:4242".parse::<std::net::SocketAddr>()?;
let listener = tokio::net::TcpListener::bind(&addr)?;
let server = listener
.incoming()
.map_err(|e| eprintln!("failed to accept socket; error = {:?}", e))
.for_each(move |socket: tokio::net::TcpStream| {
let address = socket.peer_addr().expect("");
match &self.printer {
Some(callback) => { callback(address); }
None => { println!("{}", address); }
}
Ok(())
});
tokio::run(server);
Ok(())
}
}

它会编译好的。那里有一个保证 self有一个 'static生命周期。请注意,在 match我们需要传递 &self.printer 的声明,因为你不能离开借用的上下文。

但是,这需要 Test被声明为静态的,也是可变的,如果您有其他选择,这通常不是最好的方法。

另一种方式是;如果你可以通过 Test按值到 start()然后进一步将其移动到 for_each() ,代码看起来像这样:

use tokio::prelude::*;

struct Test {
printer: Option<Box<Fn(std::net::SocketAddr) + Send>>,
}

impl Test {
pub fn start(mut self) -> Result<(), Box<std::error::Error>> {
let addr = "127.0.0.1:4242".parse::<std::net::SocketAddr>()?;
let listener = tokio::net::TcpListener::bind(&addr)?;
let server = listener
.incoming()
.map_err(|e| eprintln!("failed to accept socket; error = {:?}", e))
.for_each(move |socket: tokio::net::TcpStream| {
let address = socket.peer_addr().expect("");
match &self.printer {
Some(callback) => {
callback(address);
}
None => {
println!("{}", address);
}
}
Ok(())
});
tokio::run(server);
Ok(())
}
}

fn main() {
let mut x = Test { printer: None };
x.start();
}

关于callback - 使用 Tokio 处理 TCP 连接时使用回调,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54307368/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com