gpt4 book ai didi

stream - 我如何每 N 秒从无界队列中提取消息并将它们生成到 Tokio 处理程序?

转载 作者:行者123 更新时间:2023-11-29 08:35:13 25 4
gpt4 key购买 nike

我正在尝试每隔 N 秒从无界队列中提取消息(它们本身就是 future ),并将它们生成到 Tokio 处理程序中。

我已经尝试了数十种变体,但似乎找不到正确的方法。看起来这应该是可能的,但我总是遇到 future 类型不匹配或以借用问题告终。

这是或多或少显示我想要的代码:

let fut = Interval::new_interval(Duration::from_secs(1))
.for_each(|num| vantage_dequeuer.into_future() )
.for_each(|message:VantageMessage |{
handle.spawn(message);
return Ok(());
})
.map_err(|e| panic!("delay errored; err={:?}", e));

core.run(fut);

完整代码:

extern crate futures; // 0.1.24
extern crate tokio; // 0.1.8
extern crate tokio_core; // 0.1.17

use futures::future::ok;
use futures::sync::mpsc;
use futures::{Future, Stream};
use std::thread;
use std::time::Duration;
use tokio::timer::Interval;
use tokio_core::reactor::Core;

type VantageMessage = Box<Future<Item = (), Error = ()> + Send>;

fn main() {
let (enqueuer, dequeuer) = mpsc::unbounded();
let new_fut: VantageMessage = Box::new(ok(()).and_then(|_| {
println!("Message!");
return Ok(());
}));
enqueuer.unbounded_send(new_fut);
let joinHandle = worker(Some(dequeuer));
joinHandle.join();
}

/*
Every second extract one message from dequeuer (or wait if not available)
and spawn it in the core
*/
fn worker(
mut vantage_dequeuer: Option<mpsc::UnboundedReceiver<VantageMessage>>,
) -> thread::JoinHandle<()> {
let dequeuer = dequeuer.take().unwrap();
let joinHandle = thread::spawn(|| {
let mut core = Core::new().unwrap();
let handle = core.handle();
let fut = Interval::new_interval(Duration::from_secs(1))
.for_each(|num| vantage_dequeuer.into_future())
.for_each(|message: VantageMessage| {
handle.spawn(message);
return Ok(());
})
.map_err(|e| panic!("delay errored; err={:?}", e));

core.run(fut);
println!("Returned!");
});
return joinHandle;
}

Playground

error[E0425]: cannot find value `dequeuer` in this scope
--> src/main.rs:33:20
|
33 | let dequeuer = dequeuer.take().unwrap();
| ^^^^^^^^ not found in this scope

error[E0599]: no method named `into_future` found for type `std::option::Option<futures::sync::mpsc::UnboundedReceiver<std::boxed::Box<(dyn futures::Future<Item=(), Error=()> + std::marker::Send + 'static)>>>` in the current scope
--> src/main.rs:38:46
|
38 | .for_each(|num| vantage_dequeuer.into_future())
| ^^^^^^^^^^^
|
= note: the method `into_future` exists but the following trait bounds were not satisfied:
`&mut std::option::Option<futures::sync::mpsc::UnboundedReceiver<std::boxed::Box<(dyn futures::Future<Item=(), Error=()> + std::marker::Send + 'static)>>> : futures::Stream`

最佳答案

IntervalUnboundedReceiver 都是流,所以我会使用 Stream::zip将它们结合起来:

The zipped stream waits for both streams to produce an item, and then returns that pair. If an error happens, then that error will be returned immediately. If either stream ends then the zipped stream will also end.

extern crate futures; // 0.1.24
extern crate tokio; // 0.1.8
extern crate tokio_core; // 0.1.17

use futures::{
future::ok,
sync::mpsc,
{Future, Stream},
};
use std::{thread, time::Duration};
use tokio::timer::Interval;
use tokio_core::reactor::Core;

type VantageMessage = Box<Future<Item = (), Error = ()> + Send>;

pub fn main() {
let (tx, rx) = mpsc::unbounded();

let new_fut: VantageMessage = Box::new(ok(()).and_then(|_| {
println!("Message!");
Ok(())
}));
tx.unbounded_send(new_fut).expect("Unable to send");
drop(tx); // Close the sending side

worker(rx).join().expect("Thread had a panic");
}

fn worker(queue: mpsc::UnboundedReceiver<VantageMessage>) -> thread::JoinHandle<()> {
thread::spawn(|| {
let mut core = Core::new().unwrap();
let handle = core.handle();

core.run({
Interval::new_interval(Duration::from_secs(1))
.map_err(|e| panic!("delay errored; err={}", e))
.zip(queue)
.for_each(|(_, message)| {
handle.spawn(message);
Ok(())
})
})
.expect("Unable to run reactor");
println!("Returned!");
})
}

请注意,这实际上并不等待任何衍生的 future 在 react 堆关闭之前完成。如果你想要那样,我会切换到 tokio::runtokio::spawn:

fn worker(queue: mpsc::UnboundedReceiver<VantageMessage>) -> thread::JoinHandle<()> {
thread::spawn(|| {
tokio::run({
Interval::new_interval(Duration::from_secs(1))
.map_err(|e| panic!("delay errored; err={}", e))
.zip(queue)
.for_each(|(_, message)| {
tokio::spawn(message);
Ok(())
})
});
println!("Returned!");
})
}

关于stream - 我如何每 N 秒从无界队列中提取消息并将它们生成到 Tokio 处理程序?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52778236/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com