- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我正在尝试使用 Reactive Extensions for .NET 重写一些代码,但我需要一些关于如何实现我的目标的指导。
我有一个类在低级库中封装了一些异步行为。想一想读取或写入网络的东西。当类(class)启动时,它会尝试连接到环境,成功后会通过从工作线程调用来发回信号。
我想将这种异步行为转变为同步调用,我在下面创建了一个大大简化的示例来说明如何实现:
ManualResetEvent readyEvent = new ManualResetEvent(false);
public void Start(TimeSpan timeout) {
// Simulate a background process
ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
// Wait for startup to complete.
if (!this.readyEvent.WaitOne(timeout))
throw new TimeoutException();
}
void AsyncStart(TimeSpan delay) {
Thread.Sleep(delay); // Simulate startup delay.
this.readyEvent.Set();
}
在工作线程上运行 AsyncStart
只是模拟库的异步行为的一种方式,并不是我的真实代码的一部分,低级库提供线程并调用我的代码回调。
请注意,如果启动未在超时间隔内完成,Start
方法将抛出一个 TimeoutException
。
我想重写这段代码以使用 Rx。这是我的第一次尝试:
Subject<Unit> readySubject = new Subject<Unit>();
public void Start(TimeSpan timeout) {
ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
// Point A - see below
this.readySubject.Timeout(timeout).First();
}
void AsyncStart(TimeSpan delay) {
Thread.Sleep(delay);
this.readySubject.OnNext(new Unit());
}
这是一次不错的尝试,但不幸的是它包含了竞争条件。如果启动完成快(例如,如果delay
为 0)并且如果在 A 点有额外的延迟,则 OnNext
将在 readySubject
在 First
执行之前。本质上,我正在应用 Timeout
和 First
的 IObservable
从未看到启动已完成,并且 TimeoutException
将是而是抛出。
Observable.Defer
似乎就是为了处理这样的问题而创建的。这是使用 Rx 的稍微复杂的尝试:
Subject<Unit> readySubject = new Subject<Unit>();
void Start(TimeSpan timeout) {
var ready = Observable.Defer(() => {
ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
// Point B - see below
return this.readySubject.AsObservable();
});
ready.Timeout(timeout).First();
}
void AsyncStart(TimeSpan delay) {
Thread.Sleep(delay);
this.readySubject.OnNext(new Unit());
}
现在异步操作不会立即开始,只有在使用 IObservable
时才会开始。不幸的是,仍然存在竞争条件,但这次是在 B 点。如果异步操作在 Defer
lambda 返回之前开始调用 OnNext
,它仍然会丢失并且出现 TimeoutException
将被 Timeout
抛出。
我知道我可以使用像 Replay
这样的操作符来缓冲事件,但我最初的例子没有使用 Rx 没有使用任何类型的缓冲。有没有办法让我使用 Rx 来解决我的问题而没有竞争条件?本质上只有在 IObservable
连接到 Timeout
和 First
?
根据 Ana Betts 的回答,这里是可行的解决方案:
void Start(TimeSpan timeout) {
var readySubject = new AsyncSubject<Unit>();
ThreadPool.QueueUserWorkItem(_ => AsyncStart(readySubject, TimeSpan.FromSeconds(1)));
// Point C - see below
readySubject.Timeout(timeout).First();
}
void AsyncStart(ISubject<Unit> readySubject, TimeSpan delay) {
Thread.Sleep(delay);
readySubject.OnNext(new Unit());
readySubject.OnCompleted();
}
有趣的是,当 C 点的延迟比 AsyncStart
完成所需的时间长时。 AsyncSubject
保留最后发送的通知,Timeout
和 First
仍将按预期执行。
最佳答案
所以,关于 Rx,我想很多人一开始都知道一件事(包括我自己!):如果您使用任何传统的线程函数,如 ResetEvents、Thread.Sleeps 或其他任何东西,那么您就是在做错误 (tm) - 这就像在 LINQ 中将内容转换为数组,因为您知道基础类型恰好是数组。
要知道的关键是异步函数由返回 IObservable<TResult>
的函数表示。 - 这是让您在某事完成时发出信号的神奇调味料。所以这里是你如何“Rx-ify”一个更传统的异步函数,就像你在 Silverlight 网络服务中看到的那样:
IObservable<byte[]> readFromNetwork()
{
var ret = new AsyncSubject();
// Here's a traditional async function that you provide a callback to
asyncReaderFunc(theFile, buffer => {
ret.OnNext(buffer);
ret.OnCompleted();
});
return ret;
}
This is a decent attempt but unfortunately it contains a race condition.
这是AsyncSubject
的地方进来 - 这确保即使 asyncReaderFunc 击败了订阅,AsyncSubject 仍然会“重播”发生的事情。
所以,现在我们已经有了我们的函数,我们可以对它做很多有趣的事情:
// Make it into a sync function
byte[] results = readFromNetwork().First();
// Keep reading blocks one at a time until we run out
readFromNetwork().Repeat().TakeUntil(x => x == null || x.Length == 0).Subscribe(bytes => {
Console.WriteLine("Read {0} bytes in chunk", bytes.Length);
})
// Read the entire stream and get notified when the whole deal is finished
readFromNetwork()
.Repeat().TakeUntil(x => x == null || x.Length == 0)
.Aggregate(new MemoryStream(), (ms, bytes) => ms.Write(bytes))
.Subscribe(ms => {
Console.WriteLine("Got {0} bytes in total", ms.ToArray().Length);
});
// Or just get the entire thing as a MemoryStream and wait for it
var memoryStream = readFromNetwork()
.Repeat().TakeUntil(x => x == null || x.Length == 0)
.Aggregate(new MemoryStream(), (ms, bytes) => ms.Write(bytes))
.First();
关于c# - 使用 Rx 阻塞(并可能超时)异步操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/4715850/
我在网上搜索但没有找到任何合适的文章解释如何使用 javascript 使用 WCF 服务,尤其是 WebScriptEndpoint。 任何人都可以对此给出任何指导吗? 谢谢 最佳答案 这是一篇关于
我正在编写一个将运行 Linux 命令的 C 程序,例如: cat/etc/passwd | grep 列表 |剪切-c 1-5 我没有任何结果 *这里 parent 等待第一个 child (chi
所以我正在尝试处理文件上传,然后将该文件作为二进制文件存储到数据库中。在我存储它之后,我尝试在给定的 URL 上提供文件。我似乎找不到适合这里的方法。我需要使用数据库,因为我使用 Google 应用引
我正在尝试制作一个宏,将下面的公式添加到单元格中,然后将其拖到整个列中并在 H 列中复制相同的公式 我想在 F 和 H 列中输入公式的数据 Range("F1").formula = "=IF(ISE
问题类似于this one ,但我想使用 OperatorPrecedenceParser 解析带有函数应用程序的表达式在 FParsec . 这是我的 AST: type Expression =
我想通过使用 sequelize 和 node.js 将这个查询更改为代码取决于在哪里 select COUNT(gender) as genderCount from customers where
我正在使用GNU bash,版本5.0.3(1)-发行版(x86_64-pc-linux-gnu),我想知道为什么简单的赋值语句会出现语法错误: #/bin/bash var1=/tmp
这里,为什么我的代码在 IE 中不起作用。我的代码适用于所有浏览器。没有问题。但是当我在 IE 上运行我的项目时,它发现错误。 而且我的 jquery 类和 insertadjacentHTMl 也不
我正在尝试更改标签的innerHTML。我无权访问该表单,因此无法编辑 HTML。标签具有的唯一标识符是“for”属性。 这是输入和标签的结构:
我有一个页面,我可以在其中返回用户帖子,可以使用一些 jquery 代码对这些帖子进行即时评论,在发布新评论后,我在帖子下插入新评论以及删除 按钮。问题是 Delete 按钮在新插入的元素上不起作用,
我有一个大约有 20 列的“管道分隔”文件。我只想使用 sha1sum 散列第一列,它是一个数字,如帐号,并按原样返回其余列。 使用 awk 或 sed 执行此操作的最佳方法是什么? Accounti
我需要将以下内容插入到我的表中...我的用户表有五列 id、用户名、密码、名称、条目。 (我还没有提交任何东西到条目中,我稍后会使用 php 来做)但由于某种原因我不断收到这个错误:#1054 - U
所以我试图有一个输入字段,我可以在其中输入任何字符,但然后将输入的值小写,删除任何非字母数字字符,留下“。”而不是空格。 例如,如果我输入: 地球的 70% 是水,-!*#$^^ & 30% 土地 输
我正在尝试做一些我认为非常简单的事情,但出于某种原因我没有得到想要的结果?我是 javascript 的新手,但对 java 有经验,所以我相信我没有使用某种正确的规则。 这是一个获取输入值、检查选择
我想使用 angularjs 从 mysql 数据库加载数据。 这就是应用程序的工作原理;用户登录,他们的用户名存储在 cookie 中。该用户名显示在主页上 我想获取这个值并通过 angularjs
我正在使用 autoLayout,我想在 UITableViewCell 上放置一个 UIlabel,它应该始终位于单元格的右侧和右侧的中心。 这就是我想要实现的目标 所以在这里你可以看到我正在谈论的
我需要与 MySql 等效的 elasticsearch 查询。我的 sql 查询: SELECT DISTINCT t.product_id AS id FROM tbl_sup_price t
我正在实现代码以使用 JSON。 func setup() { if let flickrURL = NSURL(string: "https://api.flickr.com/
我尝试使用for循环声明变量,然后测试cols和rols是否相同。如果是,它将运行递归函数。但是,我在 javascript 中执行 do 时遇到问题。有人可以帮忙吗? 现在,在比较 col.1 和
我举了一个我正在处理的问题的简短示例。 HTML代码: 1 2 3 CSS 代码: .BB a:hover{ color: #000; } .BB > li:after {
我是一名优秀的程序员,十分优秀!