gpt4 book ai didi

stream - 如何从流本身中取消无限流?

转载 作者:行者123 更新时间:2023-11-29 07:55:44 24 4
gpt4 key购买 nike

我试图在清空队列后取消一个间隔 (interval_timer),但不确定什么是正确的策略。

let mut some_vars = vec![1, 2, 3, 4, 5, 6, 7, 8];
let interval_timer = tokio_timer::Timer::default();

let timer = interval_timer
.interval(Duration::from_millis(1000))
.map_err(|_| {
println!("Errored out");
});

let s = timer.for_each(move |_| {
println!("Woke up");
let item = some_vars.pop().unwrap();

let f = futures::future::ok(item).map(|x| {
println!("{:?}", x);
});
tokio::spawn(f)
});

tokio::run(s);

我按照 gitter 中的建议尝试了 drop 但最终出现错误:

let mut some_vars = vec![1, 2, 3, 4, 5, 6, 7, 8];
let mut interval_timer = tokio_timer::Timer::default();

let timer = interval_timer
.interval(Duration::from_millis(1000))
.map_err(|_| {
println!("Errored out");
});

let s = timer.for_each(move |_| {
println!("Woke up");
if some_vars.len() == 1 {
drop(interval_timer);
}

let item = some_vars.pop().unwrap();

let f = futures::future::ok(item).map(|x| {
println!("{:?}", x);
});
tokio::spawn(f)
});

tokio::run(s);

错误:

error[E0507]: cannot move out of captured outer variable in an `FnMut` closure
--> src/main.rs:72:22
|
60 | let mut interval_timer = tokio_timer::Timer::default();
| ------------------ captured outer variable
...
72 | drop(interval_timer);
| ^^^^^^^^^^^^^^ cannot move out of captured outer variable in an `FnMut` closure

最佳答案

对于您想要从流的外部 取消流的情况,请参阅 stream-cancel .


对于您的具体情况,最简单的方法是将您的集合转换为流并将其与间隔计时器压缩在一起。这样,当集合为空时,结果流自然停止:

use futures::{future, stream, Stream}; // 0.1.29
use std::time::Duration;
use tokio; // 0.1.22
use tokio_timer::Interval; // 0.2.11

fn main() {
tokio::run({
let some_vars = vec![1, 2, 3, 4, 5, 6, 7, 8];

let timer =
Interval::new_interval(Duration::from_millis(100)).map_err(|e| panic!("Error: {}", e));

let some_vars = stream::iter_ok(some_vars.into_iter().rev());
let combined = timer.zip(some_vars);

combined.for_each(move |(_, item)| {
eprintln!("Woke up");

tokio::spawn(future::lazy(move || {
println!("{:?}", item);
Ok(())
}));

Ok(())
})
});
}

否则,您可以通过使用 and_then 从集合中删除值并控制流是否应该继续来停止流:

use futures::{future, Stream}; // 0.1.29
use std::time::Duration;
use tokio; // 0.1.22
use tokio_timer::Interval; // 0.2.11

fn main() {
tokio::run({
let mut some_vars = vec![1, 2, 3, 4, 5, 6, 7, 8];

let timer =
Interval::new_interval(Duration::from_millis(100)).map_err(|e| panic!("Error: {}", e));

let limited = timer.and_then(move |_| {
if some_vars.len() <= 4 {
Err(())
} else {
some_vars.pop().ok_or(())
}
});

limited.for_each(move |item| {
eprintln!("Woke up");

tokio::spawn(future::lazy(move || {
println!("{:?}", item);
Ok(())
}));

Ok(())
})
});
}

关于stream - 如何从流本身中取消无限流?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49235832/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com