- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我希望能够按照这些思路做一些事情,以便异步关闭 Receiver
流:
extern crate futures;
extern crate tokio;
use futures::future::lazy;
use futures::stream::AndThen;
use futures::sync::mpsc::Receiver;
use futures::{Future, Sink, Stream};
use std::sync::{Arc, Mutex};
use tokio::timer::{Delay, Interval};
fn main() {
tokio::run(lazy(|| {
let (tx, rx) = futures::sync::mpsc::channel(1000);
let arc = Arc::new(Mutex::<Option<AndThen<Receiver<u32>, _, _>>>::new(None));
{
let mut and_then = arc.lock().unwrap();
*and_then = Some(rx.and_then(|num| {
println!("{}", num);
Ok(())
}));
}
let arc_clone = arc.clone();
// This is the part I'd like to be able to do
// After one second, close the `Receiver` so that future
// calls to the `Sender` don't call the callback above in the
// closure passed to `rx.and_then`
tokio::spawn(
Delay::new(std::time::Instant::now() + std::time::Duration::from_secs(1))
.map_err(|e| eprintln!("Some delay err {:?}", e))
.and_then(move |_| {
let mut maybe_stream = arc_clone.lock().unwrap();
match maybe_stream.take() {
Some(stream) => stream.into_inner().close(),
None => eprintln!("Can't close non-existent stream"), // line "A"
}
Ok(())
}),
);
{
let mut maybe_stream = arc.lock().unwrap();
let stream = maybe_stream.take().expect("Stream already ripped out"); // line "B"
let rx = stream.for_each(|_| Ok(()));
tokio::spawn(rx);
}
tokio::spawn(
Interval::new_interval(std::time::Duration::from_millis(10))
.take(10)
.map_err(|e| {
eprintln!("Interval error?! {:?}", e);
})
.fold((tx, 0), |(tx, i), _| {
tx.send(i as u32)
.map_err(|e| eprintln!("Send error?! {:?}", e))
.map(move |tx| (tx, i + 1))
})
.map(|_| ()),
);
Ok(())
}));
}
但是,A 行运行是因为我必须移动 B 行上的流才能对其调用 .for_each
。如果我不调用 .for_each
(或类似的东西),据我所知,我根本无法执行 AndThen
。我无法在不实际移动对象的情况下调用 .for_each
,因为 for_each
是一种移动方法。
我有可能做我想做的事吗?这似乎绝对是可能的,但也许我遗漏了一些明显的东西。
我使用 0.1 的 future 和 0.1 的 tokio。
最佳答案
不会撒谎,我和@shepmaster 一起解决这个问题,你的问题很不清楚。也就是说,它感觉就像您正在尝试做一些 mpsc
的事情一样的一部分 futures
不适合做。
无论如何。讲解时间。
无论何时组合/组合流(或 future !),每个组合方法都需要 self
, 不是 &self
或 &mut self
正如我认为您可能希望的那样。
当你到达你的这个代码块的那一刻:
{
let mut maybe_stream = arc.lock().unwrap();
let stream = maybe_stream.take().expect("Stream already ripped out"); // line "B"
let rx = stream.for_each(|_| Ok(()));
tokio::spawn(rx);
}
...流是从Arc<Option<Receiver<T>>>
中提取的当你take()
它,它的内容被替换为None
.然后在 Tokio react 器上生成它,它开始处理这部分。这rx
现在处于循环中,您不再可用。此外,您的 maybe_stream
现在包含 None
.
延迟一段时间后,您尝试 take()
Arc<Option<Receiver<T>>>
的内容(A 行)。因为现在什么都没有了,你也什么都没有了,因此也没有什么可以关闭的了。您的代码出错了。
而不是传递 mpsc::Receiver
并希望摧毁它,使用一种机制来停止流本身。您可以自己这样做,也可以使用像 stream-cancel
这样的 crate 为你这样做。
DIY 版本在这里,根据您的代码修改:
extern crate futures;
extern crate tokio;
use futures::future::lazy;
use futures::{future, Future, Sink, Stream};
use std::sync::{Arc, RwLock};
use std::sync::atomic::{Ordering, AtomicBool};
use tokio::timer::{Delay, Interval};
fn main() {
tokio::run(lazy(|| {
let (tx, rx) = futures::sync::mpsc::channel(1000);
let circuit_breaker:Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
let c_b_copy = Arc::clone(&circuit_breaker);
tokio::spawn(
Delay::new(std::time::Instant::now() + std::time::Duration::from_secs(1))
.map_err(|e| eprintln!("Some delay err {:?}", e))
.and_then(move |_| {
// We set the CB to true in order to stop processing of the stream
circuit_breaker.store(true, Ordering::Relaxed);
Ok(())
}),
);
{
let rx2 = rx.for_each(|e| {
println!("{:?}", e);
Ok(())
});
tokio::spawn(rx2);
}
tokio::spawn(
Interval::new_interval(std::time::Duration::from_millis(100))
.take(100)
// take_while causes the stream to continue as long as its argument returns a future resolving to true.
// In this case, we're checking every time if the circuit-breaker we've introduced is false
.take_while(move |_| {
future::ok(
c_b_copy.load(Ordering::Relaxed) == false
);
})
.map_err(|e| {
eprintln!("Interval error?! {:?}", e);
})
.fold((tx, 0), |(tx, i), _| {
tx.send(i as u32)
.map_err(|e| eprintln!("Send error?! {:?}", e))
.map(move |tx| (tx, i + 1))
})
.map(|_| ()),
);
Ok(())
}));
}
添加的take_while()
允许您对流的内容或外部谓词进行操作以继续或停止流。请注意,即使我们使用的是 AtomicBool
, 我们还需要 Arc
由于'static
Tokio 的生命周期要求。
经过评论的讨论,this solution可能更适合您的用例。我有效地实现了一个由断路器覆盖的扇出流。奇迹发生在这里:
impl<S> Stream for FanOut<S> where S:Stream, S::Item:Clone {
type Item = S::Item;
type Error = S::Error;
fn poll(&mut self) -> Result<Async<Option<S::Item>>, S::Error> {
match self.inner.as_mut() {
Some(ref mut r) => {
let mut breaker = self.breaker.write().expect("Poisoned lock");
match breaker.status {
false => {
let item = r.poll();
match &item {
&Ok(Async::Ready(Some(ref i))) => {
breaker.registry.iter_mut().for_each(|sender| {
sender.try_send(i.clone()).expect("Dead channel");
});
item
},
_ => item
}
},
true => Ok(Async::Ready(None))
}
}
_ => {
let mut breaker = self.breaker.write().expect("Poisoned lock");
// Stream is over, drop all the senders
breaker.registry = vec![];
Ok(Async::Ready(None))
}
}
}
}
如果状态指示器设置为false,则轮询上述流;然后将结果发送给所有听众。如果 poll
的结果是Async::Ready(None)
(表示推流结束),关闭所有监听 channel 。
如果状态指示器设置为真,则关闭所有监听 channel ,流返回Async::Ready(None)
(并被 Tokio 取消执行)。
FanOut
对象是可克隆的,但只有初始实例才能执行任何操作。
关于rust - 如何关闭已修改和正在执行的 `futures::sync::mpsc::Receiver` 流?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53905328/
是否可以使用 MS Sync Framework 同步不同数据库中的不同表(具有完全不同的结构),假设其中一个表具有主键,另一个具有唯一索引,其值与第一个表的 PK 一致(但不能晋级PK)?列名也可以
我正在开发一个使用 Twilio Sync 作为数据库的应用程序。我确实订阅了很多事件,例如 itemAdded、itemUpdated、itemRemoved,以使用 Sync Javascript
我们计划使用 ADO.NET 的同步服务将商店与总部同步,有人建议我通过 进行同步。 WCF 代理 . 通过 WCF 代理同步是否意味着我将能够 通过 http 同步 ?如果没有,是否可以使用同步服务
在 Sync Framework 2.1 中,批处理似乎得到了内置支持,因此在某些提供程序中,我们可以只提及批处理大小和假脱机文件位置(以及一些其他属性),批处理将得到处理。开发人员不必编写自己的批处
'sync' 事件和 Backbone.sync 有什么区别? ...它们分别是什么? 最佳答案 虽然两者都与向服务器同步数据的任务有关,但两者之间没有任何联系。 Backbone.sync实现您的
在 Go 中,我们可以使用: type Data struct { lock *sync.Mutex } 或 type Data struct { lock sync.Mutex
我们在各种客户项目中使用 Microsoft Sync Framework 2.1 进行文件和数据库同步。扩展支持将于 2021 年结束。 所以我的问题是: 微软的同步框架策略是什么?它会是开源的,所
我在这个站点和其他站点上看到了一些帖子,指出问题通常是由将表添加到客户端的 SyncAgent 配置或提供者端的 SyncAdapter 的顺序引起的。我相信我在这两个地方都正确地对表格进行了排序(我
Using an Asynchronous Controller in ASP.NET MVC 的使用 BeginMethod/EndMethod 模式部分指的是 Sync() 方法。它没有链接,我无
这是代码: type someThing struct { sync.Mutex cv *sync.Cond num int } func NewSometh
异步地做多项工作(排序向量,主要是函数计算(计算是计算或内存限制,目前,我可以用以下方式编写这些操作: 使用 Threads.@spawn _f1 = Threads.@spawn f1(x) _f2
我想使用 Rust 和 once_cell实现一些静态常量结构实例,一个静态常量向量包含这些静态结构实例。 示例代码如下: use once_cell::sync::Lazy; pub struct
我想使用 Rust 和 once_cell实现一些静态常量结构实例,一个静态常量向量包含这些静态结构实例。 示例代码如下: use once_cell::sync::Lazy; pub struct
看了sync.Pool的设计,发现是两个逻辑,为什么要用localPool来解决锁竞争。我们可以只使用 chan 来实现一个。 使用 channel 比 sync.pool 快 4 倍! pool除了
我有一个编码为在模糊时运行 save_notes 函数的文本区域。当 save_notes 调用时,我在控制台中得到以下内容: TypeError: Cannot read property 'syn
如果 Waitgroups 和 Mutex 总是需要通过引用传递,我们不能把它做成引用类型(禁止使用它们作为值传递)吗?我的意思是有没有我们需要使用它们按值传递的用例? 最佳答案 当您将任何参数作为值
背景:我正在尝试创建一个完全异步 Node js 演示的小测试。然后我可以看到一个好的样本是什么样的,以便清理我完成的其他一些项目。 这是我的代码 //https://medium.com/@tkss
http://msdn.microsoft.com/en-us/library/dd918848.aspx “了解作用域是表和筛选器的组合很重要。例如,您可以定义一个名为 sales-WA 的筛选作用
我尝试同步我的 Visual Studio 代码设置/扩展。我手动执行的许多同步步骤。 然后我发现了这两个工具 https://code.visualstudio.com/docs/editor/se
我正在尝试移植一个扩展,但我的 JS 非常生疏。 https://developer.mozilla.org/en-US/docs/Mozilla/Add-ons/WebExtensions/Chro
我是一名优秀的程序员,十分优秀!