gpt4 book ai didi

c# - 使用异步函数订阅可观察序列

转载 作者:太空狗 更新时间:2023-10-29 22:21:38 26 4
gpt4 key购买 nike

我有一个 asnyc我想对 IObservable 中的每个观察调用的函数顺序,一次限制对一个事件的传递。消费者期望在传输过程中不超过一条消息;如果我理解正确的话,这也是 RX 合约。

考虑这个示例:

static void Main() {
var ob = Observable.Interval(TimeSpan.FromMilliseconds(100));
//var d = ob.Subscribe(async x => await Consume(x)); // Does not rate-limit.
var d = ob.Subscribe(x => Consume(x).Wait());
Thread.Sleep(10000);
d.Dispose();
}

static async Task<Unit> Consume(long count) {
Console.WriteLine($"Consuming {count} on thread {Thread.CurrentThread.ManagedThreadId}");
await Task.Delay(750);
Console.WriteLine($"Returning on thread {Thread.CurrentThread.ManagedThreadId}");
return Unit.Default;
}

Consume函数伪造了 750 毫秒的处理时间,并且 ob每 100 毫秒产生一次事件。上面的代码有效,但调用了 task.Wait()在随机线程上。如果我改为在注释掉的第 3 行中订阅,那么 Consume以与 ob 相同的速率被调用产生事件(我什至无法理解我在这个注释语句中使用的 Subscribe 的重载,所以这可能是无稽之谈)。

那么我如何一次正确地将一个事件从可观察序列传递到 async功能?

最佳答案

订阅者不应该长时间运行,因此不支持在订阅处理程序中执行长时间运行的异步方法。

相反,将您的异步方法视为从另一个序列获取值的单值可观察序列。现在您可以组合序列,这正是 Rx 的设计目的。

现在您已经实现了这一飞跃,您可能会得到类似于@Reijher 在 Howto call back async function from rx subscribe? 中创建的东西.

他的代码分解如下。

//The input sequence. Produces values potentially quicker than consumer
Observable.Interval(TimeSpan.FromSeconds(1))
//Project the event you receive, into the result of the async method
.Select(l => Observable.FromAsync(() => asyncMethod(l)))
//Ensure that the results are serialized
.Concat()
//do what you will here with the results of the async method calls
.Subscribe();

在这种情况下,您将创建隐式队列。在生产者比消费者快的任何问题中,都需要使用队列在等待时收集值。我个人更喜欢通过将数据放入队列来明确这一点。或者,您可以显式使用调度程序来指示线程模型应该弥补不足。

对于 Rx 新手来说,这似乎是一个流行的障碍(在订阅处理程序中执行异步)。指南不将它们放入您的订阅者中的原因有很多,例如: 1. 你破坏了错误模型 2. 你正在混合异步模型(这里是 rx,那里是 task) 3. subscribe 是异步序列组合的消费者。异步方法只是一个单值序列,因此该 View 不能作为序列的结尾,但它的结果可能是。

更新

为了说明关于打破错误模型的评论,这里更新了 OP 示例。

void Main()
{
var ob = Observable.Interval(TimeSpan.FromMilliseconds(100));
var d = ob.Subscribe(
x => ConsumeThrows(x).Wait(),
ex=> Console.WriteLine("I will not get hit"));

Thread.Sleep(10000);
d.Dispose();
}

static async Task<Unit> ConsumeThrows(long count)
{
return await Task.FromException<Unit>(new Exception("some failure"));
//this will have the same effect of bringing down the application.
//throw new Exception("some failure");
}

在这里我们可以看到,如果 OnNext 处理程序抛出异常,那么我们将不受 Rx OnError 处理程序的保护。异常将得不到处理,很可能会导致应用程序崩溃。

关于c# - 使用异步函数订阅可观察序列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37129159/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com