gpt4 book ai didi

rust - CpuPool 的生命周期问题

转载 作者:行者123 更新时间:2023-11-29 08:33:11 25 4
gpt4 key购买 nike

我正在使用 tokio 和 futures-cpupool 编写一个简单的 RPC 服务器。服务器持有一个 BTreeMap 盒装闭包,函数名称作为键。当前的实现非常简单:

pub struct SlackerServiceSync<T>
where T: Send + Sync + 'static
{
functions: Arc<BTreeMap<String, RpcFnSync<T>>>,
pool: CpuPool,
}

impl<T> SlackerServiceSync<T>
where T: Send + Sync + 'static
{
pub fn new(functions: Arc<BTreeMap<String, RpcFnSync<T>>>,
threads: usize)
-> SlackerServiceSync<T> {
let pool = CpuPool::new(threads);
SlackerServiceSync { functions, pool }
}
}

impl<T> Service for SlackerServiceSync<T>
where T: Send + Sync + 'static
{
type Request = SlackerPacket<T>;
type Response = SlackerPacket<T>;
type Error = io::Error;
type Future = BoxFuture<Self::Response, Self::Error>;

fn call(&self, req: Self::Request) -> Self::Future {
match req {
SlackerPacket::Request(sreq) => {
debug!("getting request: {:?}", sreq.fname);
if let Some(f) = self.functions.get(&sreq.fname) {
self.pool
.spawn_fn(move || -> FutureResult<T, Self::Error> {
ok(f(&sreq.arguments))
})
.and_then(move |result| {
debug!("getting results");
ok(SlackerPacket::Response(SlackerResponse {
version: sreq.version,
code: RESULT_CODE_SUCCESS,
content_type: sreq.content_type,
serial_id: sreq.serial_id,
result: result,
}))
})
.map_err(|_| io::Error::new(io::ErrorKind::Other, "Oneshot canceled"))
.boxed()
} else {
let error = SlackerError {
version: sreq.version,
code: RESULT_CODE_NOT_FOUND,
serial_id: sreq.serial_id,
};
ok(SlackerPacket::Error(error)).boxed()
}
}
SlackerPacket::Ping(ref ping) => {
ok(SlackerPacket::Pong(SlackerPong { version: ping.version })).boxed()
}
_ => err(io::Error::new(io::ErrorKind::InvalidInput, "Unsupported packet")).boxed(),
}
}
}

我目前被 self.functions.get(&sreq.fname) 上的这个生命周期问题阻止了。

error[E0495]: cannot infer an appropriate lifetime for lifetime parameter in function call due to conflicting requirements
--> src/service.rs:103:49
|
103 | if let Some(f) = self.functions.get(&sreq.fname) {
| ^^^
|
note: first, the lifetime cannot outlive the anonymous lifetime #1 defined on the body at 99:55...
--> src/service.rs:99:56
|
99 | fn call(&self, req: Self::Request) -> Self::Future {
| ________________________________________________________^
100 | | match req {
101 | | SlackerPacket::Request(sreq) => {
102 | | debug!("getting request: {:?}", sreq.fname);
... |
133 | | }
134 | | }
| |_____^
note: ...so that reference does not outlive borrowed content
--> src/service.rs:103:34
|
103 | if let Some(f) = self.functions.get(&sreq.fname) {
| ^^^^^^^^^^^^^^
= note: but, the lifetime must be valid for the static lifetime...
note: ...so that the type `[closure@src/service.rs:105:35: 107:36 f:&std::boxed::Box<for<'r> std::ops::Fn(&'r std::vec::Vec<T>) -> T + std::marker::Send + std::marker::Sync>, sreq:packets::SlackerRequest<T>]` will meet its required lifetime bounds
--> src/service.rs:105:26
|
105 | .spawn_fn(move || -> FutureResult<T, Self::Error> {
| ^^^^^^^^

Similar code works without CpuPool .我无法完全理解编译器报告的错误。

Full code is here

最佳答案

事实证明,我需要通过像这样声明 RcpFnSync 将闭包包装到 Arc 中:

pub type RpcFnSync<T> = Arc<Fn(&Vec<T>) -> T + Send + Sync + 'static>;

然后在发送到另一个线程之前克隆它:

fn call(&self, req: Self::Request) -> Self::Future {
match req {
SlackerPacket::Request(sreq) => {
debug!("getting request: {:?}", sreq.fname);
if let Some(fi) = self.functions.get(&sreq.fname) {
let f = fi.clone();

self.pool
.spawn_fn(move || -> FutureResult<Self::Response, Self::Error> {
let result = f(&sreq.arguments);
ok(SlackerPacket::Response(SlackerResponse {
version: sreq.version,
code: RESULT_CODE_SUCCESS,
content_type: sreq.content_type,
serial_id: sreq.serial_id,
result: result,
}))
})
.boxed()
} else {
let error = SlackerError {
version: sreq.version,
code: RESULT_CODE_NOT_FOUND,
serial_id: sreq.serial_id,
};
ok(SlackerPacket::Error(error)).boxed()
}
}
SlackerPacket::Ping(ref ping) => {
ok(SlackerPacket::Pong(SlackerPong { version: ping.version })).boxed()
}
_ => err(io::Error::new(io::ErrorKind::InvalidInput, "Unsupported packet")).boxed(),
}
}

关于rust - CpuPool 的生命周期问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43708422/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com