gpt4 book ai didi

c# - 通过 observable 限制重播缓冲区

转载 作者:太空狗 更新时间:2023-10-30 01:18:28 24 4
gpt4 key购买 nike

我有一个包含实时数据的流,以及一个基本上分隔属于一起的实时数据部分的流。现在,当有人订阅实时数据流时,我想为他们重播实时数据。但是我不想记住所有实时数据,只记住自上次其他流发出值以来的部分。

There is an issue这将解决我的问题,因为有一个重放操作符完全符合我的要求(或者至少我认为)。

目前有什么方法可以轻松做到这一点?有没有比下面的方法更好的方法?

private class ReplayWithLimitObservable<TItem, TDelimiter> : IConnectableObservable<TItem>
{
private readonly List<TItem> cached = new List<TItem>();
private readonly IObservable<TDelimiter> delimitersObservable;
private readonly IObservable<TItem> itemsObservable;
public ReplayWithLimitObservable(IObservable<TItem> itemsObservable, IObservable<TDelimiter> delimitersObservable)
{
this.itemsObservable = itemsObservable;
this.delimitersObservable = delimitersObservable;
}

public IDisposable Subscribe(IObserver<TItem> observer)
{
lock (cached)
{
cached.ForEach(observer.OnNext);
}

return itemsObservable.Subscribe(observer);
}

public IDisposable Connect()
{
var delimiters = delimitersObservable.Subscribe(
p =>
{
lock (cached)
{
cached.Clear();
}
});
var items = itemsObservable.Subscribe(
p =>
{
lock (cached)
{
cached.Add(p);
}
});
return Disposable.Create(
() =>
{
items.Dispose();
delimiters.Dispose();
lock (cached)
{
cached.Clear();
}
});
}

public static IConnectableObservable<TItem> ReplayWithLimit<TItem, TDelimiter>(IObservable<TItem> items, IObservable<TDelimiter> delimiters)
{
return new ReplayWithLimitObservable<TItem, TDelimiter>(items, delimiters);
}

最佳答案

这是否符合您的要求?它的优点是将所有锁定和竞争条件留给 Rx 专家 :)

private class ReplayWithLimitObservable<T, TDelimiter> : IConnectableObservable<T>
{
private IConnectableObservable<IObservable<T>> _source;

public ReplayWithLimitObservable(IObservable<T> source, IObservable<TDelimiter> delimiter)
{
_source = source
.Window(delimiter) // new replay window on delimiter
.Select<IObservable<T>,IObservable<T>>(window =>
{
var replayWindow = window.Replay();

// immediately connect and start memorizing values
replayWindow.Connect();

return replayWindow;
})
.Replay(1); // remember the latest window
}

IDisposable Connect()
{
return _source.Connect();
}

IDisposable Subscribe(IObserver<T> observer)
{
return _source
.Concat()
.Subscribe(observer);
}
}

public static IConnectableObservable<TItem> ReplayWithLimit<TItem, TDelimiter>(IObservable<TItem> items, IObservable<TDelimiter> delimiters)
{
return new ReplayWithLimitObservable<TItem, TDelimiter>(items, delimiters);
}

关于c# - 通过 observable 限制重播缓冲区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27312468/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com