- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我想构建一个多生产者多消费者 (MPMC) channel ,其中包含处理和生成数据的不同并发任务。其中一些任务负责与文件系统或网络接口(interface)。
两个例子:
PrintOutput(String)
将由记录器、控制台输出或 GUI 使用。
NewJson(String)
将由记录器或解析器使用。
为此,我选择了 chan
作为 MPMC channel 提供商和 tokio
作为管理 channel 上每个听众事件循环的系统。
阅读 tokio's site 上的示例后,我开始实现futures::stream::Stream
对于 chan::Receiver
.这将允许为每个 future 使用 a 来收听 channel 。然而,这两个库的文档突出了一个冲突:
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>
Attempt to pull out the next value of this stream, returning None if the stream is finished.
This method, like Future::poll, is the sole method of pulling out a value from a stream. This method must also be run within the context of a task typically and implementors of this trait must ensure that implementations of this method do not block, as it may cause consumers to behave badly.
fn recv(&self) -> Option<T>
Receive a value on this channel.
If this is an asnychronous channel, recv only blocks when the buffer is empty.
If this is a synchronous channel, recv only blocks when the buffer is empty.
If this is a rendezvous channel, recv blocks until a corresponding send sends a value.
For all channels, if the channel is closed and the buffer is empty, then recv always and immediately returns None. (If the buffer is non-empty on a closed channel, then values from the buffer are returned.)
Values are guaranteed to be received in the same order that they are sent.
This operation will never panic! but it can deadlock if the channel is never closed.
chan::Receiver
当缓冲区为空时可能会阻塞,但是 futures::stream::Stream
期望在轮询时永远不会阻塞。
如果一个空缓冲区阻塞,则没有明确的方法来确认它是空的。如何检查缓冲区是否为空以防止阻塞?
尽管Kabuki在我的雷达上,似乎是最成熟的 Actor 模型 crate ,它几乎完全缺乏文档。
到目前为止,这是我的实现:
extern crate chan;
extern crate futures;
struct RX<T>(chan::Receiver<T>);
impl<T> futures::stream::Stream for RX<T> {
type Item = T;
type Error = Box<std::error::Error>;
fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
let &mut RX(ref receiver) = self;
let item = receiver.recv();
match item {
Some(value) => Ok(futures::Async::Ready(Some(value))),
None => Ok(futures::Async::NotReady),
}
}
}
我已经完成了一个快速测试,看看它是如何工作的。看起来不错,但正如预期的那样,在完成缓冲区后确实会阻塞。虽然这应该有效,但我有点担心消费者“行为不端”意味着什么。现在我将继续测试这种方法,希望我不会遇到不良行为。
extern crate chan;
extern crate futures;
use futures::{Stream, Future};
fn my_test() {
let mut core = tokio_core::reactor::Core::new().unwrap();
let handle = core.handle();
let (tx, rx) = chan::async::<String>();
tx.send("Hello".to_string()); // fill the buffer before it blocks; single thread here.
let incoming = RX(rx).for_each(|s| {
println!("Result: {}", s);
Ok(())
});
core.run(incoming).unwrap()
}
最佳答案
chan
crate 提供了一个 chan_select
允许非阻塞 recv
的宏;但是要为这些原语实现 Future
,您还需要在 channel 准备就绪时唤醒任务(请参阅 futures::task::current()
)。
您可以使用现有的原语实现Future
;实现新的通常更困难。在这种情况下,您可能必须 fork chan
以使其与 Future
兼容。
似乎multiqueue
crate 有一个Future
兼容的mpmc channel mpmc_fut_queue
.
关于multithreading - 防止 `chan::Receiver` 在空缓冲区上阻塞,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46871542/
这个问题在这里已经有了答案: Possible to make an event handler wait until async / Promise-based code is done? (2
我经常有多个运行的进程(R,Python,eshell/shell),对于每个进程,我经常都有一个相关的脚本,可以从中发送摘要。为此,我通常将每个框架垂直地分成两个窗口,以便脚本文件(例如.py)位于
如何修改 emacs 在关闭缓冲区后选择要显示的缓冲区的方式? 当我有多个列显示相同的缓冲区,然后在其中一个缓冲区中打开另一个文件,然后关闭新打开的缓冲区时,它不会切换回前一个缓冲区,而是切换到另一个
如何将 ex 命令复制到剪贴板或粘贴到缓冲区? 在 Windows 上使用 gvim。 最佳答案 windows剪贴板可以通过the buffer + 访问.因此,可以使用 + 将剪贴板粘贴为前命令。
在 javascript 中如何以比以下更简单的方式获取 b 缓冲区? var num=6553599 var a = new Buffer(4); a.writeInt32LE(num)
每次我在 Google 上搜索有关 OpenGL 编程的文章时,我都会找到一些文章,但似乎所有文章都提到了着色器和缓冲区。那些是什么?你能解释其中的一些吗: 深度缓冲区 模板缓冲区 像素着色器 帧缓冲
我有java考试,当我学习时,我看到了这个练习,我尝试解决它,但我发现一些困难,所以请帮助我考虑实用程序中方法的以下注释、 header 和部分代码名为 Atbash 的加密类。 /**
每次我在 Google 上搜索有关 OpenGL 编程的文章时,我都会找到一些文章,但似乎所有文章都提到了着色器和缓冲区。那些是什么?你能解释其中的一些吗: 深度缓冲区 模板缓冲区 像素着色器 帧缓冲
对于每个属性使用跨步顶点缓冲区与紧密打包缓冲区有何优缺点?我的意思是例如: 步幅:xyzrgb xyzrgb xyzrgb 紧:xyzxyzxyz rgbrgbrgb 乍一看,使用步幅时您似乎可以轻松
我正在尝试将文本文件中每行的数字读取到 ArrayList 中。当我执行以下函数时,它总是跳过最后一个元素。有人可以帮我吗?因为我在这里没有遇到问题,因为它读取直到缓冲区为空,所以他应该在到达 Fil
#include #include int main () { time_t time_raw_format; struct tm * ptr_time; char *buff
基本上我有一个包含不同类型数据的自定义结构。例如: typedef struct example_structure{ uint8_t* example_1[4]; int example_2[4];
我之前的列表实现是一个简单的 LinearLayout,位于一个装满我的项目的 ScrollView 中。 我切换到 ListView 的 Android 实现以简单地使用 CursorAdapter
我想创建一个可变长度的输入事件窗口/缓冲区,当它接收到额外的事件时会变长。 这是为了实现“键入时搜索”功能。我想捕获点击,但为了不给服务器造成压力,我想明智地进行服务调用。 我想到的逻辑是缓冲击键,从
我想将 yuv420P 像素写入缓冲区而不是二进制文件。假设我在指针中存储了 luma 、 Cb 和 Cr。 luma = output_pixel.luma; cb = output_pixel.c
我想在 Go 中构建一个支持多个并发读取器和一个写入器的缓冲区。所有写入缓冲区的内容都应由所有读者读取。允许新读者随时加入,这意味着已经写入的数据必须能够为迟到的读者回放。 缓冲区应满足以下接口(in
本文转载自微信公众号「小明菜市场」,作者小明菜市场。转载本文请联系小明菜市场公众号。 前言 Java NIO 需要理解的主要有缓冲区,通道,选择器,这三个主要的部分。 基础
一 点睛 NIO,可以称为 New IO 或 Non Blocking IO,是在 JDK 1.4 后提供的新 API。传统的I/O 是阻塞式的 I/O、面向流的操作;而 NIO 是非阻塞 I/O 、
我正在寻找一种切换到包含搜索文本的缓冲区的方法。 例如。如果我打开了 100 个缓冲区,我想切换到一个包含 'fooBar = 1' 的缓冲区 最佳答案 我写了一个 Vim 插件来做到这一点:buff
我正在尝试将提取的视频帧(我使用 ffmpeg)推送到 FFMPEG 缓冲区中。我已经查看了 ffmpeg 的缓冲区源文件,例如 buffersrc.c 和 fifo.c,以确定我是否可以这样做,但我
我是一名优秀的程序员,十分优秀!