gpt4 book ai didi

concurrency - 为什么 `futures::channel::mpsc`只能通知一个发送者?

转载 作者:行者123 更新时间:2023-11-29 08:13:49 24 4
gpt4 key购买 nike

我正在阅读 futures-preview 0.3 源代码以了解如何正确地“通知任何”。在 mpsc::channel(有界)中,多个发送者可能会等待接收(在缓冲区已满的情况下)。

研究 next_message 的实现和 unpark_one , 收件人似乎每一张收据只通知一个发件人。

我怀疑这是否适用于 select! ,因为 select! 可能会导致错误通知。但是,我无法产生问题案例。

这是我试图混淆 mpsc 的尝试:

[package]
name = "futures-mpsc-test"
version = "0.1.0"
edition = "2018"

[dependencies]
futures-preview = { version = "0.3.0-alpha.9", features = ["tokio-compat"] }
tokio = "0.1.11"

还有这个:

#![feature(async_await, await_macro, futures_api, pin)]

use std::collections::HashSet;

use futures::prelude::*;

use futures::channel::mpsc::{channel, Sender};
use futures::channel::oneshot;
use futures::select;

async fn main2() {
let channel_len = 1;
let num_false_wait = 1000;
let num_expected_messages = 100;

let (mut send, mut recv) = channel(channel_len);
// One extra capacity per sender. Fill the extras.
await!(send.send(-2)).unwrap();

// Fill buffers
for _ in 0..channel_len {
await!(send.send(-1)).unwrap();
}

// False waits. Should resolve and produce false waiters.
for _ in 0..num_false_wait {
await!(false_wait(&send));
}

// True messages.
{
let mut send = send.clone();
await!(send.send(-2)).unwrap();
tokio::spawn(async move {
for i in 0..num_expected_messages {
await!(send.send(i)).unwrap();
}
Ok(())
}.boxed().compat());
}

// Drain receiver until all true messages are received.
let mut expects = (0..num_expected_messages).collect::<HashSet<_>>();
while !expects.is_empty() {
let i = await!(recv.next()).unwrap();
expects.remove(&i);
eprintln!("Received: {}", i);
}
}

// If `send` is full, it will produce false waits.
async fn false_wait(send: &Sender<i32>) {
let (wait_send, wait_recv) = oneshot::channel();
let mut send = send.clone();
await!(send.send(-2)).unwrap();
tokio::spawn(async move {
let mut sending = send.send(-3);
let mut fallback = future::ready(());
select! {
sending => {
sending.unwrap();
},
fallback => {
eprintln!("future::ready is selected");
},
};
wait_send.send(()).unwrap();
Ok(())
}.boxed().compat());
await!(wait_recv).unwrap();
}

fn main() {
tokio::run(async {
await!(main2());
Ok(())
}.boxed().compat());
}

我希望这会发生:

  1. 缓冲区由-1填充。因此,以后的发件人将被阻止。
  2. 既有“真服务员”也有“假服务员”。假服务员已经退出,因为 select! 的另一臂立即完成。
  3. 在每次调用 await!(recv.next()) 时,最多 一个 等待发件人通知。如果一个假服务员被通知,没有人可以推到缓冲区,即使缓冲区有空房间。
  4. 如果所有元素都在没有真正通知的情况下被耗尽,整个系统都卡住了。

尽管如我所料,main2 异步函数已成功完成。为什么?

最佳答案

进一步调查 futures 源代码解决了我的问题。终于不能这样混淆mpsc了。

重点是,mpsc 的大小是灵活的,可以增长到比最初指定的更多。此行为是 mentioned in the docs :

The channel's capacity is equal to buffer + num-senders. In other words, each sender gets a guaranteed slot in the channel capacity, and on top of that there are buffer "first come, first serve" slots available to all senders.

是的,我在做实验之前先读过这个,但当时我无法弄清楚这个的重要性。

固定缓冲区问题

考虑一个典型的有界队列实现,其中队列的大小不能增长超过最初指定的大小。规范是这样的:

  • 当队列为空时,接收者阻塞。
  • 当队列已满(即大小达到界限)时,发送者将阻塞。

在这种情况下,如果队列已满,多个发送者正在等待一个资源(队列的大小)。

在多线程编程中,这是通过 notify_one 等原语完成的。然而,在 futures 中,这是容易出错的:与多线程编程不同,通知的任务不一定使用资源,因为任务可能已经放弃获取资源(由于像 select! 这样的构造或 Deadline ) 然后规范就被破坏了(队列未满,但所有事件的发件人都被阻止)。

mpsc 灵活

如上所述,futures::channel::mpsc::channel 的缓冲区大小并不严格。规范总结如下:

  • message_queue.len() == 0 时,接收者阻塞。
  • message_queue.len() >= buffer 时,发件人可能阻止。
  • message_queue.len() >= buffer + num_senders 时,发件人阻止。

在这里,num_senders 基本上Sender 的克隆数,但在某些情况下会更多。更准确地说,num_sendersSenderTask 的个数。

那么,我们如何避免资源共享呢?我们有额外的状态:

  • 每个发送者(SenderTask 的实例)都有 is_parked bool 状态。
  • 该 channel 有另一个名为 parked_queue 的队列,一个 Arc 队列引用 SenderTask

channel 维护以下不变量:

  • message_queue.len() <= buffer + num_parked_senders 。请注意,我们不知道 num_parked_senders 的值。
  • parked_queue.len() == min(0, message_queue.len() - buffer)
  • 每个驻留的发件人在 parked_queue 中至少有一条消息。

这是通过以下算法完成的:

  • 对于接收,
    • 它从 SenderTask 弹出一个 parked_queue,如果发件人已停放,则取消停放。
  • 对于发送,
    • 它总是等待 is_parked 变为 false。如果 message_queue.len() < bufferparked_queue.len() == 0 ,则所有发件人都未停放。因此,我们可以保证在这种情况下取​​得进展。
    • 如果 is_parkedfalse ,无论如何将消息推送到队列。
    • 在那之后,如果是 message_queue.len() <= buffer ,它不需要做任何进一步的事情。
    • 如果是 message_queue.len() > buffer ,发送者将被解除停放并推送到 parked_queue

您可以轻松地检查上述算法中是否保持了不变量。

令人惊讶的是,发送者不再等待共享资源。相反,发送方等待其 is_parked 状态。即使发送任务在完成之前被丢弃,它也只是在 parked_queue 中停留一段时间,不会阻塞任何东西。多聪明啊!

关于concurrency - 为什么 `futures::channel::mpsc`只能通知一个发送者?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53245906/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com