- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我有一个 asnyc
我想对 IObservable
中的每个观察调用的函数顺序,一次限制对一个事件的传递。消费者期望在传输过程中不超过一条消息;如果我理解正确的话,这也是 RX 合约。
考虑这个示例:
static void Main() {
var ob = Observable.Interval(TimeSpan.FromMilliseconds(100));
//var d = ob.Subscribe(async x => await Consume(x)); // Does not rate-limit.
var d = ob.Subscribe(x => Consume(x).Wait());
Thread.Sleep(10000);
d.Dispose();
}
static async Task<Unit> Consume(long count) {
Console.WriteLine($"Consuming {count} on thread {Thread.CurrentThread.ManagedThreadId}");
await Task.Delay(750);
Console.WriteLine($"Returning on thread {Thread.CurrentThread.ManagedThreadId}");
return Unit.Default;
}
Consume
函数伪造了 750 毫秒的处理时间,并且 ob
每 100 毫秒产生一次事件。上面的代码有效,但调用了 task.Wait()
在随机线程上。如果我改为在注释掉的第 3 行中订阅,那么 Consume
以与 ob
相同的速率被调用产生事件(我什至无法理解我在这个注释语句中使用的 Subscribe
的重载,所以这可能是无稽之谈)。
那么我如何一次正确地将一个事件从可观察序列传递到 async
功能?
最佳答案
订阅者不应该长时间运行,因此不支持在订阅处理程序中执行长时间运行的异步方法。
相反,将您的异步方法视为从另一个序列获取值的单值可观察序列。现在您可以组合序列,这正是 Rx 的设计目的。
现在您已经实现了这一飞跃,您可能会得到类似于@Reijher 在 Howto call back async function from rx subscribe? 中创建的东西.
他的代码分解如下。
//The input sequence. Produces values potentially quicker than consumer
Observable.Interval(TimeSpan.FromSeconds(1))
//Project the event you receive, into the result of the async method
.Select(l => Observable.FromAsync(() => asyncMethod(l)))
//Ensure that the results are serialized
.Concat()
//do what you will here with the results of the async method calls
.Subscribe();
在这种情况下,您将创建隐式队列。在生产者比消费者快的任何问题中,都需要使用队列在等待时收集值。我个人更喜欢通过将数据放入队列来明确这一点。或者,您可以显式使用调度程序来指示线程模型应该弥补不足。
对于 Rx 新手来说,这似乎是一个流行的障碍(在订阅处理程序中执行异步)。指南不将它们放入您的订阅者中的原因有很多,例如: 1. 你破坏了错误模型 2. 你正在混合异步模型(这里是 rx,那里是 task) 3. subscribe 是异步序列组合的消费者。异步方法只是一个单值序列,因此该 View 不能作为序列的结尾,但它的结果可能是。
更新
为了说明关于打破错误模型的评论,这里更新了 OP 示例。
void Main()
{
var ob = Observable.Interval(TimeSpan.FromMilliseconds(100));
var d = ob.Subscribe(
x => ConsumeThrows(x).Wait(),
ex=> Console.WriteLine("I will not get hit"));
Thread.Sleep(10000);
d.Dispose();
}
static async Task<Unit> ConsumeThrows(long count)
{
return await Task.FromException<Unit>(new Exception("some failure"));
//this will have the same effect of bringing down the application.
//throw new Exception("some failure");
}
在这里我们可以看到,如果 OnNext
处理程序抛出异常,那么我们将不受 Rx OnError
处理程序的保护。异常将得不到处理,很可能会导致应用程序崩溃。
关于c# - 使用异步函数订阅可观察序列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37129159/
我想知道是否有一种方法可以重复记录而不进行排序?有时候,我想保持原始顺序,只想删除重复的记录。 是否可以? 顺便说一句,以下是我所知道的有关重复记录的信息,这些记录最终会进行排序。 1。 proc s
我想更新我的 Activity 中依赖于另一个列表的数据的列表。这两个数据列表都是从我的 View 模型的 Activity 中观察到的。从第一个列表获取数据后,我需要在此列表上运行 for 循环以获
我无法理解这个问题。我怎样才能等待 i==2 完成然后再继续其他 i 的操作? class Observable { constructor() { this.observer
我正在观察这样的 Ember Data RecordArray: myArray: function() { return MyRecord.find(); }.property(), isDir
我想在动画开始时观察 strokeEnd 键路径。但是它不起作用,我哪里出错了? - (void)addAnimation { // do animation CABasicAnima
是否可以在 Algorand 中观看某个交易,就像在以太坊中观看某个事件一样? 最佳答案 官方 algod 和 indexer API 目前不支持在 Algorand 上观看交易/事件。 您可以通过使
我有一个可以拖放到其他 View 之上的 View (可以说是类别)。为了检测我在哪个类别 View 之上,我将它们的帧存储在一个帧数组中,这发生在它们不可见叠加层的 onAppear 中。 (这基于
是否可以将观察者添加到可见性更改(即调用 show() 和 hide())时触发的 DOM 元素?谢谢! 最佳答案 如果您想观察任何对 .show() 或 .hide() 的调用,并且可以访问 jQu
我对保存在 NSUserdefaults 中的特定键的值变化感兴趣。然而,我所拥有的并不适合我。 observeValueForKeyPath 不会被触发。 更新:我想我已经发现了这个问题。如果我使用
我正在寻找在 UITableView 顶部实现捏入/捏出,我已经研究了几种方法,包括这个: Similar question 但是,虽然我可以创建一个 UIViewTouch 对象并将其覆盖到我的 U
我有一个在界面中公开的可变数组。我还公开了数组访问器来修改数组。如果数组内发生任何修改,我将不得不使用 KVO 重置并重新计算一些数据。为了支持 KVO,我使用 array accessors如下图:
当 NSPopupButton 发生变化时如何获得方法调用?谢谢! 最佳答案 您只需添加一个操作方法,就像使用 NSButton 或任何其他控件一样。 关于iphone - 观察 NSPopupBut
我正在尝试让键值观察适用于 NSMutableArray。下面是被观察类 MyObservee 的 .h 文件: @interface MyObservee : NSObject { @pri
我很难理解让 Node.js 进程(异步)运行但仍然触发“退出”状态,以便在 CPU 处理完成后我可以做更多事情。 例如,我有一个 Google 地方信息抓取工具,可以在所有可用的 CPU 上高效地分
我正在尝试编写行为类似于kubectl get pods --watch . 这样,每次 pod 的状态发生变化时,我都会被触发。 我创建了一个 go项目(在集群中运行)并添加以下代码: podsWa
我有这个代码: 当时我需要触发Javascript方法或具有给定 id 的 div 隐藏或显示,这将在屏幕调整大小期间发生(因此 u k-hidden-small ),这可以
我想使用 Couchbase,但我想在一些类似于 RethinkDB 的方式实现更改跟踪。 似乎有很多方法可以将更改从 Couchbase 服务器推送给我。 DCP 点击 XDCR 哪一个是正确的选择
虽然 MutationObserver 允许监视 HTMLElement 属性的显式大小更改,但它似乎没有一种方法/配置允许我监视其大小的隐式更改,这些更改是由浏览器。 这是一个例子: const o
我有一个 auto-carousel 指令,它循环访问链接元素的子元素。 但是,子级尚未加载到 DOM 中,因为它们的 ng-if 表达式尚未解析。 如何确保父指令知道其 DOM 树已发生更改?
有没有办法观察 AngularJS 指令中函数表达式的值变化?我有以下 HTML 和 JavaScript,模板中 {{editable()}} 的插值显示该值计算为 true,而检查 Chrome
我是一名优秀的程序员,十分优秀!