gpt4 book ai didi

c# - 如何使 IObservable 的实现成为多线程的?

转载 作者:太空宇宙 更新时间:2023-11-03 11:40:57 24 4
gpt4 key购买 nike

我根据 [Rx DEVHOL202] 和 http ://rxwiki.wikidot.com/101samples#toc48 中的示例编写了一个实现

这是我的代码。 http://csharp.pastebin.com/pm2NAPx6

  1. 它有效,但对 OnNext 的调用不是非阻塞的,这是我想实现的以模拟网络读取并在读取到处理程序时异步移交每个字节 block [这不是此处完整显示,但可能会缓存结果并进行进一步处理。

    这样做的好方法是什么?

  2. 一旦抛出异常,所有后续的 OnNext() 都不会被处理!!如果我没有明确退出循环并指示完成。为什么会这样?

最佳答案

我强烈建议反对尝试实现您自己的IObservable。隐式规则超越线程安全并进入方法调用顺序。

通常你会从方法中返回IObservable实例,但如果你需要一个直接实现它的类,你应该包装一个Subject:

public class SomeObservable<T> : IObservable<T>
{
private Subject<T> subject = new Subject<T>();

public IDisposable Subscribe(IObserver<T> observer)
{
return subject.Subscribe(observer);
}
}

1.您需要注意观察者如何支持这一点(因为您可能共享数据),但您可以通过以下两种方式之一来异步处理调用:

  • 在调用 Subscribe 之前调用 ObserveOn(Scheduler.TaskPool)(或 ThreadPool,如果您是 4.0 之前的版本)。这会导致消息通过调度程序(在本例中为任务)进行路由
  • IScheduler传递给Observer构造函数
  • 从您的订阅者启动一个异步任务/线程

2。这是预期的功能。 IObservableIObserver 之间的契约是 (OnNext)* (OnCompleted | OnError)?,也就是说“零次或多次调用 OnNext ,可以选择后跟 OnCompleted 或 OnError”。 OnCompleted|OnError后,调用OnNext无效。

Rx 中的所有运算符(WhereSelect 等)都强制执行此规则,即使源不这样做也是如此。

关于c# - 如何使 IObservable<T> 的实现成为多线程的?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/4823909/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com