gpt4 book ai didi

c# - 如何制作一个只能订阅一次的轻量级 `Replay`算子?

转载 作者:行者123 更新时间:2023-12-04 17:22:16 26 4
gpt4 key购买 nike

在各种场合我都希望收到一个 Rx Replay 缓冲传入通知的运算符,在第一次订阅时同步重放其缓冲区,然后停止缓冲。此轻量级 Replay运营商应该只能为一个用户提供服务。可以找到此类运算符的一个用例 here ,在第一次订阅后继续缓冲只是浪费资源。出于演示目的,我将在这里展示一个我希望可以避免的有问题的行为的人为示例:

var observable = Observable
.Interval(TimeSpan.FromMilliseconds(500))
.SelectMany(x => Enumerable.Range((int)x * 100_000 + 1, 100_000))
.Take(800_000)
.Do(x =>
{
if (x % 100_000 == 0) Console.WriteLine(
$"{DateTime.Now:HH:mm:ss.fff} > " +
$"Produced: {x:#,0}, TotalMemory: {GC.GetTotalMemory(true):#,0} bytes");
})
.Replay()
.AutoConnect(0);

await Task.Delay(2200);

Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > Subscribing...");

// First subscription
await observable.Do(x =>
{
if (x % 100_000 == 0)
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > Emitted: {x:#,0}");
});

// Second subscription
Console.WriteLine($"Count: {await observable.Count():#,0}");
可观察对象总共生成 800,000 个值。 Replay机制立即连接到源,并在完成之前中途订阅。
输出:
16:54:19.893 > Produced: 100,000, TotalMemory: 635,784 bytes
16:54:20.341 > Produced: 200,000, TotalMemory: 1,164,376 bytes
16:54:20.840 > Produced: 300,000, TotalMemory: 2,212,992 bytes
16:54:21.354 > Produced: 400,000, TotalMemory: 2,212,992 bytes
16:54:21.543 > Subscribing...
16:54:21.616 > Emitted: 100,000
16:54:21.624 > Emitted: 200,000
16:54:21.633 > Emitted: 300,000
16:54:21.641 > Emitted: 400,000
16:54:21.895 > Produced: 500,000, TotalMemory: 4,313,344 bytes
16:54:21.897 > Emitted: 500,000
16:54:22.380 > Produced: 600,000, TotalMemory: 6,411,208 bytes
16:54:22.381 > Emitted: 600,000
16:54:22.868 > Produced: 700,000, TotalMemory: 6,411,600 bytes
16:54:22.869 > Emitted: 700,000
16:54:23.375 > Produced: 800,000, TotalMemory: 6,413,400 bytes
16:54:23.376 > Emitted: 800,000
Count: 800,000
订阅后内存使用量不断增长。这是意料之中的,因为所有值都被缓冲,并且在重放的 observable 的整个生命周期内都被缓冲。理想的行为是订阅后内存使用量急剧下降。传播缓冲值后应丢弃缓冲区,因为订阅后它没有用。此外,第二个订阅( await observable.Count() )应该会失败并返回 InvalidOperationException .我不想在它丢失了它的 Replay 之后再次订阅可观察对象。功能。
这是自定义的 stub ReplayOnce我正在尝试实现的运算符。有没有人知道如何实现它?
public static IConnectableObservable<T> ReplayOnce<T>(this IObservable<T> source)
{
return source.Replay(); // TODO: enforce the subscribe-once policy
}
顺便说一句,有一个相关的问题 here ,关于如何制作 Replay带有缓冲区的运算符,可以根据需要偶尔清空。我的问题有所不同,因为我希望在订阅后完全禁用缓冲区,并且不再开始增长。

最佳答案

我想出了一个实现 ReplayOnce运算符,基于多播自定义 ReplayOnceSubject<T> .该主题最初由 ReplaySubject<T> 支持, 用普通的 Subject<T> 切换在第一次(也是唯一允许的)订阅期间:

public static IConnectableObservable<T> ReplayOnce<T>(
this IObservable<T> source)
{
return source.Multicast(new ReplayOnceSubject<T>());
}

private class ReplayOnceSubject<T> : ISubject<T>
{
private readonly object _locker = new object();
private ISubject<T> _subject = new ReplaySubject<T>();

public void OnNext(T value)
{
lock (_locker) _subject.OnNext(value);
}

public void OnError(Exception error)
{
lock (_locker) _subject.OnError(error);
}

public void OnCompleted()
{
lock (_locker) _subject.OnCompleted();
}

public IDisposable Subscribe(IObserver<T> observer)
{
lock (_locker)
{
if (_subject is ReplaySubject<T> replaySubject)
{
var subject = new Subject<T>();
var subscription = subject.Subscribe(observer);
// Now replay the buffered notifications
replaySubject.Subscribe(subject).Dispose();
_subject = subject;
return subscription;
}
else
throw new InvalidOperationException("Already subscribed.");
}
}
}
线路 replaySubject.Subscribe(subject)确保不仅缓冲的值将传播到观察者,而且任何可能的 OnError/ OnCompleted通知。订阅后 ReplaySubject不再被引用,并且有资格进行垃圾收集。

关于c# - 如何制作一个只能订阅一次的轻量级 `Replay`算子?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65494884/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com