gpt4 book ai didi

multithreading - 防止 `chan::Receiver` 在空缓冲区上阻塞

转载 作者:行者123 更新时间:2023-11-29 08:18:39 24 4
gpt4 key购买 nike

我想构建一个多生产者多消费者 (MPMC) channel ,其中包含处理和生成数据的不同并发任务。其中一些任务负责与文件系统或网络接口(interface)。

两个例子:

  • PrintOutput(String)将由记录器、控制台输出或 GUI 使用。

  • NewJson(String)将由记录器或解析器使用。

为此,我选择了 chan作为 MPMC channel 提供商和 tokio作为管理 channel 上每个听众事件循环的系统。

阅读 tokio's site 上的示例后,我开始实现futures::stream::Stream对于 chan::Receiver .这将允许为每个 future 使用 a 来收听 channel 。然而,这两个库的文档突出了一个冲突:

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>

Attempt to pull out the next value of this stream, returning None if the stream is finished.

This method, like Future::poll, is the sole method of pulling out a value from a stream. This method must also be run within the context of a task typically and implementors of this trait must ensure that implementations of this method do not block, as it may cause consumers to behave badly.

fn recv(&self) -> Option<T>

Receive a value on this channel.

If this is an asnychronous channel, recv only blocks when the buffer is empty.

If this is a synchronous channel, recv only blocks when the buffer is empty.

If this is a rendezvous channel, recv blocks until a corresponding send sends a value.

For all channels, if the channel is closed and the buffer is empty, then recv always and immediately returns None. (If the buffer is non-empty on a closed channel, then values from the buffer are returned.)

Values are guaranteed to be received in the same order that they are sent.

This operation will never panic! but it can deadlock if the channel is never closed.

chan::Receiver当缓冲区为空时可能会阻塞,但是 futures::stream::Stream期望在轮询时永远不会阻塞。

如果一个空缓冲区阻塞,则没有明确的方法来确认它是空的。如何检查缓冲区是否为空以防止阻塞?

尽管Kabuki在我的雷达上,似乎是最成熟的 Actor 模型 crate ,它几乎完全缺乏文档。


到目前为止,这是我的实现:

extern crate chan;
extern crate futures;

struct RX<T>(chan::Receiver<T>);

impl<T> futures::stream::Stream for RX<T> {
type Item = T;
type Error = Box<std::error::Error>;

fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
let &mut RX(ref receiver) = self;
let item = receiver.recv();

match item {
Some(value) => Ok(futures::Async::Ready(Some(value))),
None => Ok(futures::Async::NotReady),
}
}
}

我已经完成了一个快速测试,看看它是如何工作的。看起来不错,但正如预期的那样,在完成缓冲区后确实会阻塞。虽然这应该有效,但我有点担心消费者“行为不端”意味着什么。现在我将继续测试这种方法,希望我不会遇到不良行为。

extern crate chan;
extern crate futures;
use futures::{Stream, Future};

fn my_test() {
let mut core = tokio_core::reactor::Core::new().unwrap();
let handle = core.handle();

let (tx, rx) = chan::async::<String>();

tx.send("Hello".to_string()); // fill the buffer before it blocks; single thread here.

let incoming = RX(rx).for_each(|s| {
println!("Result: {}", s);

Ok(())
});

core.run(incoming).unwrap()
}

最佳答案

chan crate 提供了一个 chan_select允许非阻塞 recv 的宏;但是要为这些原语实现 Future,您还需要在 channel 准备就绪时唤醒任务(请参阅 futures::task::current())。

您可以使用现有的原语实现Future;实现新的通常更困难。在这种情况下,您可能必须 fork chan 以使其与 Future 兼容。

似乎multiqueue crate 有一个Future 兼容的mpmc channel mpmc_fut_queue .

关于multithreading - 防止 `chan::Receiver` 在空缓冲区上阻塞,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46871542/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com