gpt4 book ai didi

c# - 从头开始实现 IObservable

转载 作者:IT王子 更新时间:2023-10-29 04:00:12 27 4
gpt4 key购买 nike

Reactive Extensions 附带了许多辅助方法,用于将现有事件和异步操作转换为可观察对象,但您将如何从头开始实现 IObservable

IEnumerable 具有可爱的 yield 关键字,使其非常易于实现。

什么是实现 IObservable 的正确方法?

我需要担心线程安全吗?

我知道支持在特定的同步上下文中回调,但这是我作为 IObservable 作者需要担心的事情还是以某种方式内置?

更新:

这是我的 C# 版本的 Brian 的 F# 解决方案

using System;
using System.Linq;
using Microsoft.FSharp.Collections;

namespace Jesperll
{
class Observable<T> : IObservable<T>, IDisposable where T : EventArgs
{
private FSharpMap<int, IObserver<T>> subscribers =
FSharpMap<int, IObserver<T>>.Empty;
private readonly object thisLock = new object();
private int key;
private bool isDisposed;

public void Dispose()
{
Dispose(true);
}

protected virtual void Dispose(bool disposing)
{
if (disposing && !isDisposed)
{
OnCompleted();
isDisposed = true;
}
}

protected void OnNext(T value)
{
if (isDisposed)
{
throw new ObjectDisposedException("Observable<T>");
}

foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
{
observer.OnNext(value);
}
}

protected void OnError(Exception exception)
{
if (isDisposed)
{
throw new ObjectDisposedException("Observable<T>");
}

if (exception == null)
{
throw new ArgumentNullException("exception");
}

foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
{
observer.OnError(exception);
}
}

protected void OnCompleted()
{
if (isDisposed)
{
throw new ObjectDisposedException("Observable<T>");
}

foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
{
observer.OnCompleted();
}
}

public IDisposable Subscribe(IObserver<T> observer)
{
if (observer == null)
{
throw new ArgumentNullException("observer");
}

lock (thisLock)
{
int k = key++;
subscribers = subscribers.Add(k, observer);
return new AnonymousDisposable(() =>
{
lock (thisLock)
{
subscribers = subscribers.Remove(k);
}
});
}
}
}

class AnonymousDisposable : IDisposable
{
Action dispose;
public AnonymousDisposable(Action dispose)
{
this.dispose = dispose;
}

public void Dispose()
{
dispose();
}
}
}

编辑:如果 Dispose 被调用两次,不要抛出 ObjectDisposedException

最佳答案

official documentation反对用户自己实现 IObservable。相反,用户应该使用工厂方法 Observable.Create

When possible, implement new operators by composing existing operators. Otherwise implement custom operators using Observable.Create

碰巧 Observable.Create 是 Reactive 内部类 AnonymousObservable 的简单包装器:

public static IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, IDisposable> subscribe)
{
if (subscribe == null)
{
throw new ArgumentNullException("subscribe");
}
return new AnonymousObservable<TSource>(subscribe);
}

我不知道他们为什么不公开他们的实现,但是嘿,随便吧。

关于c# - 从头开始实现 IObservable<T>,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/1768974/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com