- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试在一个 hyper 之间建立一个基于 channel 的通信服务与一tokio溪流。问题是编译器报错如下:
closure is
FnOnce
because it moves the variabletx_queue
out of its environment.
在阅读了 rustc --explain E0525
提供的解释后,似乎 tokio::sync::mpsc::Sender实现了 Clone
但没有实现 Copy
(除非我忽略了什么)。
所以我有点卡住了。如何让我的服务通过 tokio::sync::mpsc
channel 将消息发送到 tokio 流?我确定我错过了一些明显但看不到的东西:/
有问题的代码的摘录(根据@E_net4 的要求进行了修改以使其更短):
extern crate hyper;
extern crate tokio;
extern crate tokio_signal;
use futures::Stream;
use hyper::rt::Future;
use hyper::service::service_fn_ok;
use hyper::{Body, Request, Response, Server};
use futures::sink::Sink;
use futures::sync::{mpsc, oneshot};
use futures::{future, stream};
fn main() {
let mut runtime = tokio::runtime::Runtime::new().unwrap();
let (tx1, rx1) = oneshot::channel::<()>();
let (tx_queue, rx_queue) = mpsc::channel(10);
// ----
runtime.spawn(start_queue(rx_queue));
// ----
let http_server = Server::bind(&([127, 0, 0, 1], 3000).into()).serve(|| {
service_fn_ok(move |_: Request<Body>| {
tx_queue.send(1);
Response::new(Body::from("Hello World!"))
})
});
let graceful = http_server
.with_graceful_shutdown(rx1)
.map_err(|err| eprintln!("server error: {}", err))
.and_then(|_| {
dbg!("stopped");
// TODO: stop order queue listener
Ok(())
});
dbg!("HTTP server listening ...");
runtime.spawn(graceful);
// ----
tx1.send(()).unwrap();
dbg!("exited");
}
pub fn start_queue(rx: mpsc::Receiver<usize>) -> impl Future<Item = (), Error = ()> {
#[derive(Eq, PartialEq)]
enum Item {
Value(usize),
Tick,
Done,
}
let items = rx
.map(Item::Value)
.chain(stream::once(Ok(Item::Done)))
.take_while(|item| future::ok(*item != Item::Done));
items
.fold(0, |num, _item| {
dbg!("x");
future::ok(num)
})
.map(|_| ())
}
完整代码可在此处获得:https://gist.github.com/jeromer/52aa2da43c5c93584c6ee55be68dd04e
谢谢:)
最佳答案
futures::sync::mpsc::Sender::send
使用 Sender
并生成一个 Send
对象,这是一个future 必须运行完成才能实际发送数据。如果 channel 已满,它将阻塞,直到其他人从 channel 接收。完成后,它会返回 Sender
,您可以使用它来发送更多数据。
在这种情况下,我认为您不能仅使用 Sender
的单个实例来构建代码。您需要克隆它,以便每次调用服务功能时都有新的克隆。注意两个闭包现在都是move
:
let http_server = Server::bind(&([127, 0, 0, 1], 3000).into()).serve(move || {
// This closure has one instance of tx_queue that was moved-in here.
// Now we make a copy to be moved into the closure below.
let tx_queue = tx_queue.clone();
service_fn_ok(move |_: Request<Body>| {
// This closure has one instance of tx_queue, but it will be called
// multiple times, so it can not consume it. It must make a copy
// before consuming it.
tx_queue.clone().send(111);
Response::new(Body::from("Hello World!"))
})
});
但是,这会给你以下警告:
warning: unused `futures::sink::send::Send` that must be used
正如我所说,send
只是为您提供了一个必须运行才能实际执行发送的 future 。如果忽略返回值,则什么也不会发生。在这种情况下,最好 spawn
它作为一个单独的任务(这样它就不会阻止响应客户端)。要生成它,您需要一个来自运行时的执行程序,还必须为内部闭包克隆它:
let executor = runtime.executor();
let http_server = Server::bind(&([127, 0, 0, 1], 3000).into()).serve(move || {
let tx_queue = tx_queue.clone();
let executor = executor.clone();
service_fn_ok(move |_: Request<Body>| {
executor.spawn(tx_queue.clone().send(111).map(|_| ()).map_err(|err| {
// TODO: Handle the error differenty!
panic!("Error in mpsc {:?}", err);
}));
Response::new(Body::from("Hello World!"))
})
});
关于rust - 任务间的 channel 通信,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55062400/
编辑备注 由于 Rust(版本:1.42)仍然没有稳定的 ABI ,推荐使用extern (目前相当于extern "C"(将来可能会改变))否则,可能需要重新编译库。 This article解释如
词法分析器/解析器文件位于 here非常大,我不确定它是否适合只检索 Rust 函数列表。也许我自己编写/使用另一个库是更好的选择? 最终目标是创建一种执行管理器。为了上下文化,它将能够读取包装在函数
我试图在 Rust 中展平 Enum 的向量,但我遇到了一些问题: enum Foo { A(i32), B(i32, i32), } fn main() { let vf =
我正在 64 位模式下运行的 Raspberry Pi 3 上使用 Rust 进行裸机编程。我已经实现了一个自旋锁,如下所示: use core::{sync::atomic::{AtomicBool
我无法理解以下示例是如何从 this code 中提炼出来的, 编译: trait A: B {} trait B {} impl B for T where T: A {} struct Foo;
在我写了一些代码和阅读了一些文章之后,我对 Rust 中的移动语义有点困惑,我认为值移动后,它应该被释放,内存应该是无效的。所以我尝试写一些代码来作证。 第一个例子 #[derive(Debug)]
https://doc.rust-lang.org/reference/types/closure.html#capture-modes struct SetVec { set: HashSe
考虑 const-generic 数据结构的经典示例:方矩阵。 struct Matrix { inner: [[T; N]; N] } 我想返回一个结构体,其 const 参数是动态定义的:
以下代码无法编译,因为 x在移动之后使用(因为 x 具有类型 &mut u8 ,它没有实现 Copy 特性) fn main() { let mut a: u8 = 1; let x:
我在玩 Rust,发现了下面的例子: fn main() { let mut x = [3, 4, 5].to_vec(); x; println!("{:?}", x); }
假设一个 Rust 2018 宏定义了一个 async里面的功能。它将使用的语法与 Rust 2015 不兼容。因此,如果您使用 2015 版编译您的 crate,那么宏中的扩展代码不会与它冲突吗?
假设我有一些 Foo 的自定义集合s: struct Bar {} struct Foo { bar: Bar } struct SubList { contents: Vec, }
代码如下: fn inner(x:&'a i32, _y:&'b i32) -> &'b i32 { x } fn main() { let a = 1; { let b
在lifetime_things的定义中,'b的生命周期比'a长,但实际上当我调用这个函数时,x1比y1长,但是这样可以编译成功: //here you could see 'b:'a means
我正在尝试检索 FLTK-RS Widget 周围的 Arc Mutex 包装器的内部值: pub struct ArcWidget(Arc>); impl ArcWidget{ pub
如下代码所示,我想封装一个定时函数,返回一个闭包的结果和执行时间。 use tap::prelude::Pipe; use std::time::{Instant, Duration}; pub fn
我想实现自己的通用容器,这是我正在使用的特征的片段: pub trait MyVec where Self: Default + Clone + IntoIterator, Self:
所需代码: 注释掉的块可以编译并工作,但是我想从嵌套的匹配样式转变为更简洁的函数链 async fn ws_req_resp(msg: String, conn: PgConn) -> Result>
我正在尝试编写一些代码,该代码将生成具有随机值的随机结构。对于结构,我具有以下特征和帮助程序宏: use rand::{thread_rng, Rng}; use std::fmt; pub trai
我有一个带有函数成员的结构: struct Foo { fun: Box, } type FooI = Foo; 这不起作用: error[E0106]: missing lifetime s
我是一名优秀的程序员,十分优秀!