- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在阅读 futures-preview
0.3 源代码以了解如何正确地“通知任何”。在 mpsc::channel
(有界)中,多个发送者可能会等待接收(在缓冲区已满的情况下)。
研究 next_message
的实现和 unpark_one
, 收件人似乎每一张收据只通知一个发件人。
我怀疑这是否适用于 select!
,因为 select!
可能会导致错误通知。但是,我无法产生问题案例。
这是我试图混淆 mpsc
的尝试:
[package]
name = "futures-mpsc-test"
version = "0.1.0"
edition = "2018"
[dependencies]
futures-preview = { version = "0.3.0-alpha.9", features = ["tokio-compat"] }
tokio = "0.1.11"
还有这个:
#![feature(async_await, await_macro, futures_api, pin)]
use std::collections::HashSet;
use futures::prelude::*;
use futures::channel::mpsc::{channel, Sender};
use futures::channel::oneshot;
use futures::select;
async fn main2() {
let channel_len = 1;
let num_false_wait = 1000;
let num_expected_messages = 100;
let (mut send, mut recv) = channel(channel_len);
// One extra capacity per sender. Fill the extras.
await!(send.send(-2)).unwrap();
// Fill buffers
for _ in 0..channel_len {
await!(send.send(-1)).unwrap();
}
// False waits. Should resolve and produce false waiters.
for _ in 0..num_false_wait {
await!(false_wait(&send));
}
// True messages.
{
let mut send = send.clone();
await!(send.send(-2)).unwrap();
tokio::spawn(async move {
for i in 0..num_expected_messages {
await!(send.send(i)).unwrap();
}
Ok(())
}.boxed().compat());
}
// Drain receiver until all true messages are received.
let mut expects = (0..num_expected_messages).collect::<HashSet<_>>();
while !expects.is_empty() {
let i = await!(recv.next()).unwrap();
expects.remove(&i);
eprintln!("Received: {}", i);
}
}
// If `send` is full, it will produce false waits.
async fn false_wait(send: &Sender<i32>) {
let (wait_send, wait_recv) = oneshot::channel();
let mut send = send.clone();
await!(send.send(-2)).unwrap();
tokio::spawn(async move {
let mut sending = send.send(-3);
let mut fallback = future::ready(());
select! {
sending => {
sending.unwrap();
},
fallback => {
eprintln!("future::ready is selected");
},
};
wait_send.send(()).unwrap();
Ok(())
}.boxed().compat());
await!(wait_recv).unwrap();
}
fn main() {
tokio::run(async {
await!(main2());
Ok(())
}.boxed().compat());
}
我希望这会发生:
-1
填充。因此,以后的发件人将被阻止。select!
的另一臂立即完成。await!(recv.next())
时,最多 一个 等待发件人通知。如果一个假服务员被通知,没有人可以推到缓冲区,即使缓冲区有空房间。尽管如我所料,main2
异步函数已成功完成。为什么?
最佳答案
进一步调查 futures
源代码解决了我的问题。终于不能这样混淆mpsc了。
重点是,mpsc
的大小是灵活的,可以增长到比最初指定的更多。此行为是 mentioned in the docs :
The channel's capacity is equal to
buffer + num-senders
. In other words, each sender gets a guaranteed slot in the channel capacity, and on top of that there are buffer "first come, first serve" slots available to all senders.
是的,我在做实验之前先读过这个,但当时我无法弄清楚这个的重要性。
考虑一个典型的有界队列实现,其中队列的大小不能增长超过最初指定的大小。规范是这样的:
在这种情况下,如果队列已满,多个发送者正在等待一个资源(队列的大小)。
在多线程编程中,这是通过 notify_one
等原语完成的。然而,在 futures
中,这是容易出错的:与多线程编程不同,通知的任务不一定使用资源,因为任务可能已经放弃获取资源(由于像 select!
这样的构造或 Deadline
) 然后规范就被破坏了(队列未满,但所有事件的发件人都被阻止)。
mpsc
灵活如上所述,futures::channel::mpsc::channel
的缓冲区大小并不严格。规范总结如下:
message_queue.len() == 0
时,接收者阻塞。message_queue.len() >= buffer
时,发件人可能阻止。message_queue.len() >= buffer + num_senders
时,发件人阻止。在这里,num_senders
基本上是 Sender
的克隆数,但在某些情况下会更多。更准确地说,num_senders
是 SenderTask
的个数。
那么,我们如何避免资源共享呢?我们有额外的状态:
SenderTask
的实例)都有 is_parked
bool 状态。parked_queue
的队列,一个 Arc
队列引用 SenderTask
。channel 维护以下不变量:
message_queue.len() <= buffer + num_parked_senders
。请注意,我们不知道 num_parked_senders
的值。parked_queue.len() == min(0, message_queue.len() - buffer)
parked_queue
中至少有一条消息。这是通过以下算法完成的:
SenderTask
弹出一个 parked_queue
,如果发件人已停放,则取消停放。is_parked
变为 false
。如果 message_queue.len() < buffer
为 parked_queue.len() == 0
,则所有发件人都未停放。因此,我们可以保证在这种情况下取得进展。is_parked
是 false
,无论如何将消息推送到队列。message_queue.len() <= buffer
,它不需要做任何进一步的事情。message_queue.len() > buffer
,发送者将被解除停放并推送到 parked_queue
。您可以轻松地检查上述算法中是否保持了不变量。
令人惊讶的是,发送者不再等待共享资源。相反,发送方等待其 is_parked
状态。即使发送任务在完成之前被丢弃,它也只是在 parked_queue
中停留一段时间,不会阻塞任何东西。多聪明啊!
关于concurrency - 为什么 `futures::channel::mpsc`只能通知一个发送者?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53245906/
我有一个应用程序应该在应用程序处于前台和后台(不在历史记录中)时显示提醒通知。 在前景情况下,我通过以下方法实现了这一点。 PendingIntent pendingIntent = PendingI
如何为我的 WPF 应用程序创建通知,例如浏览器上的通知,它们通过浏览器顶部的“工具栏”显示消息或通过在右下角向上/向下滑动的弹出窗口显示“MSN”样式通知屏幕。也许在应用程序中心淡入/淡出的面板可以
关闭。这个问题是opinion-based .它目前不接受答案。 想要改进这个问题? 更新问题,以便 editing this post 可以用事实和引用来回答它. 关闭 9 年前。 Improve
我正在使用 Redis 作为分布式缓存。我有不同的应用程序,它们只听特定的键。例如:App1 听 App1.*App2 监听 App2.* 等等。 我的应用程序使用以下模式接收通知:App1:“ ke
我正在尝试构建一个基于官方节点 docker 镜像的 docker 镜像,我想知道是否有某种方法可以在推送新版本的官方节点镜像时自动重建镜像。这样我的图像就不会基于过时的基础图像。 也许有类似 rss
我在一个项目中工作,我需要在添加或修改文件时在数据库中记录文件信息,以便它们保持同步。这些文件应该存储在 Nextcloud 服务器中,那么 Nextcloud 是否有办法通知这些更改(例如 webh
通知类中的方法via 如何根据用户的偏好动态变化,一个用户可能想通过电子邮件接收,而另一个用户则不想 public function via($notifiable) { return ['d
我有一个应用程序,我正在发送推送通知,如果用户登录到应用程序,这很好 - 但是,如果他们没有/如果他们没有在 X 分钟内阅读通知,我想给他们发送一封电子邮件. 我要解决的方法是使用 Laravel N
我正在使用 Django 的 contrib.comments 并想了解以下内容。 是否有任何实用程序或应用程序可以插入到某个应用程序中,当对某个项目发表评论时向您发送通知? 我并没有真正使用过那么多
我希望用户在启动应用程序之前接受协议(protocol)。所以在 appDelegate.m 中我有以下内容: - (BOOL)application:(UIApplication *)applica
我正在创建一个新指令,我想知道如何在 angular 从 DOM 中删除元素时收到通知。 我的目标是在删除元素时添加 jquery 动画。 最佳答案 如果您尝试对元素的移除进行动画处理,则需要在移除元
我正在编写一个应用程序,其工作方式与Apple的Weather.app非常相似:底部有一个UIPageControl,屏幕中间有一个UIScrollView。在我的代码中,我实现了 - (void)s
如何查明 iPhone 注册了哪些通知? 例如: notify_post("com.apple.springboard/Prefs"); 最佳答案 虽然这个问题的答案已经得到确认,但由于 @Nate
我的 Cocoa 应用程序中有一个 TextField。该文本字段有时会被填充,有时会为空。 我希望当字段为空时按钮被禁用。现在,每当我对 Core Data 执行某些操作时,我都会检查该字段,Tex
我的应用程序在其数据库中包含文档。用户可以打开文档,在这种情况下,文档将保存到临时文件夹并在用户计算机上打开。 我希望在这些临时文件之一发生更改时收到通知,并让用户将更改后的文档保存回数据库。 在 D
我目前正在开发一个网络应用程序,它不断对 php 进行 ajax 调用(轮询),以从数据库中提取新的“任务”,有点像 gmail/facebook 检查新电子邮件和消息的方式。当前的 JavaScri
我正在尝试让通知适用于我使用 Angular 5 和 Electron 制作的 Electron 应用程序。到目前为止,我的 index.html 文件中有以下代码: function doNo
我有一个录音/播放应用程序。它在后台运行。当它进入后台时,如果任何其他音频应用程序打开或开始使用音频资源,我想适本地处理我的应用程序。 iOS 提供了一种发送此类通知的方法,如在 ipod 播放器中看
关闭。这个问题需要多问focused 。目前不接受答案。 想要改进此问题吗?更新问题,使其仅关注一个问题 editing this post . 已关闭 4 年前。 Improve this ques
是否有 Subversion 的工具可以在对某些文件提交更改时自动通知我? 最佳答案 您可以创建一个 post-commit hook script “ Hook ”提交。 在钩子(Hook)脚本中,
我是一名优秀的程序员,十分优秀!