gpt4 book ai didi

c# - Rx CatchAll 用于未观察到的消息

转载 作者:太空宇宙 更新时间:2023-11-03 21:16:16 25 4
gpt4 key购买 nike

我想知道是否可以为 Rx IObservable 设置某种包罗万象的东西。

它的行为如下:“如果没有其他订阅者观察到此消息,则执行 [something]”。

现在我连接了几个彼此不知道的 Observable 处理程序,并根据某些属性过滤事件。如果我们收到一条未被处理的消息,我想抛出一个错误,因为它将是一条无效消息。

最佳答案

我认为这是一个有趣的问题,所以我去写了一个中等规模的解决方案。我将解决方案分为三个部分:通用实现、示例用法和解释。

实现

public interface ITracked<out T>
{
T Value { get; }
bool IsObserved { get; }
void Observe();
}

public class Tracked<T> : ITracked<T>
{
private readonly T value;
public Tracked(T value)
{
this.value = value;
}
public T Value
{
get { return value; }
}
public bool IsObserved { get; private set; }
public void Observe()
{
IsObserved = true;
}
}

public interface ITrackableObservable<out T> : IObservable<ITracked<T>>
{
IObservable<T> Unobserved { get; }
}

public class TrackableObservable<T> : ITrackableObservable<T>
{
private readonly ISubject<T> unobserved = new Subject<T>();
private readonly IObservable<ITracked<T>> source;

public TrackableObservable(IObservable<T> source)
{
this.source = Observable
.Create<ITracked<T>>(observer => source.Subscribe(
value =>
{
var trackedValue = new Tracked<T>(value);
observer.OnNext(trackedValue);
if (!trackedValue.IsObserved)
{
unobserved.OnNext(value);
}
},
observer.OnError,
observer.OnCompleted))
.Publish()
.RefCount();
}

public IObservable<T> Unobserved
{
get { return unobserved.AsObservable(); }
}

public IDisposable Subscribe(IObserver<ITracked<T>> observer)
{
return source.Subscribe(observer);
}
}

public static class TrackableObservableExtensions
{
public static ITrackableObservable<T> ToTrackableObservable<T>(this IObservable<T> source)
{
return new TrackableObservable<T>(source);
}

public static IObservable<T> Observe<T>(this IObservable<ITracked<T>> source)
{
return source.Do(x => x.Observe()).Select(x => x.Value);
}

public static IObservable<T> ObserveWhere<T>(this IObservable<ITracked<T>> source, Func<T, bool> predicate)
{
return source.Where(x => predicate(x.Value)).Observe();
}
}

例子

public class Animal
{
public int ID { get; set; }
public string Kind { get; set; }
public string Name { get; set; }
}

...

IObservable<Animal> animals = ...;
ITrackableObservable<Animal> trackableAnimals = animals.ToTrackableObservable();
trackableAnimals
.ObserveWhere(a => a.Kind == "Cat")
.Subscribe(a => Console.WriteLine("{0}: Meow", a.ID));
trackableAnimals
.ObserveWhere(a => a.Kind == "Dog")
.Subscribe(a => Console.WriteLine("{0}: Woof", a.ID));
trackableAnimals
.ObserveWhere(a => a.Name != null)
.Subscribe(a => Console.WriteLine("{0}: {1} named {2}", a.ID, a.Kind, a.Name));
trackableAnimals
.Unobserved
.Subscribe(a => Console.WriteLine("{0}: {1} with no name (unobserved)", a.ID, a.Kind));

如果animals将发出这个序列:

new Animal { ID = 1, Kind = "Cat", Name = "Rusty" }
new Animal { ID = 2, Kind = "Horse" }
new Animal { ID = 3, Kind = "Dog", Name = "Fido" }
new Animal { ID = 4, Kind = "Dog" }
new Animal { ID = 5, Kind = "Bird", Name = "Simon" }

然后我们会看到这个输出:

1: Meow
1: Cat named Rusty
2: Horse with no name (unobserved)
3: Woof
3: Dog named Fido
4: Woof
5: Bird named Simon

说明

这里的想法是确保所有订阅者最终共享对源序列的单个订阅,并且每个值都有一个 bool。附在它上面,说明该值是否已被观察到。每当源序列发出 T ,我们将其包装为 ITracked<T>然后将该单个实例传递给所有订阅者。然后观察者可以选择将值标记为观察到的值。一次都OnNext如果 ITracked<T>没有被标记为观察到的,那么我们就知道它是未观察到的。

TrackableObservableExtensions类提供了一些扩展方法,使实现的工作更加流畅,但它们不是实现的一部分所必需的。

关于c# - Rx CatchAll 用于未观察到的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34075006/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com