gpt4 book ai didi

c# - 在 Rx 中存储检索 IObservable 订阅状态

转载 作者:太空狗 更新时间:2023-10-29 21:59:01 25 4
gpt4 key购买 nike

[ 这个问题属于 Reactive Extensions (Rx) 的范畴 ]

需要在应用重启时继续的订阅

int nValuesBeforeOutput = 123;

myStream.Buffer(nValuesBeforeOutput).Subscribe(
i => Debug.WriteLine("Something Critical on Every 123rd Value"));

现在我需要序列化和反序列化此订阅的状态,以便下次启动应用程序时缓冲区计数不会从零开始,而是从应用程序退出前的缓冲区计数开始 .

  • 在这种情况下,您如何保持 IObservable.Subscribe() 的状态并稍后加载它?
  • 是否有在 Rx 中保存观察者状态的通用解决方案?



从答案到解决方案

基于 Paul Betts 方法,这是一个在我的初始测试中有效的半通用化实现

使用

int nValuesBeforeOutput = 123;

var myRecordableStream = myStream.Record(serializer);
myRecordableStream.Buffer(nValuesBeforeOutput).ClearRecords(serializer).Subscribe(
i => Debug.WriteLine("Something Critical on Every 123rd Value"));

扩展方法

    private static bool _alreadyRecording;

public static IObservable<T> Record<T>(this IObservable<T> input,
IRepositor repositor)
{
IObservable<T> output = input;
List<T> records = null;
if (repositor.Deserialize(ref records))
{
ISubject<T> history = new ReplaySubject<T>();
records.ForEach(history.OnNext);
output = input.Merge(history);
}
if (!_alreadyRecording)
{
_alreadyRecording = true;
input.Subscribe(i => repositor.SerializeAppend(new List<T> {i}));
}
return output;
}

public static IObservable<T> ClearRecords<T>(this IObservable<T> input,
IRepositor repositor)
{
input.Subscribe(i => repositor.Clear());
return input;
}

注释

  • 这不适用于存储依赖于生成值之间的时间间隔的状态
  • 您需要一个支持序列化 T 的序列化器实现
  • _alreadyRecording 如果您多次订阅 myRecordableStream 则需要
  • _alreadyRecording 是一个静态 bool 值,非常丑陋,如果需要并行订阅,则可以防止扩展方法在多个地方使用 - 需要重新实现以备将来使用

最佳答案

对此没有通用的解决方案,制作一个将是 NonTrivial™。您可以做的最接近的事情是使 myStream 成为某种重播 Observable(即不是序列化状态,而是序列化 myStream 的状态并重做工作以使您回到原来的位置)。

关于c# - 在 Rx 中存储检索 IObservable 订阅状态,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10089215/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com