- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
如何清除 ReplaySubject
上的缓冲区?
我需要定期清除缓冲区(在我的例子中作为一天结束的事件)以防止 ReplaySubject
不断增长并最终吃掉所有内存。
理想情况下,我希望保持相同的 ReplaySubject
,因为客户端订阅仍然很好。
最佳答案
ReplaySubject
不提供清除缓冲区的方法,但有几种重载以不同方式限制其缓冲区:
TimeSpan
这是一个非常有趣的问题 - 我决定看看使用现有的主题和运算符(如这些非常坚固)。结果证明它相当简单。
我已经通过内存分析器运行它来检查它是否正确。调用 Clear()
来刷新缓冲区,否则它就像一个普通的无界 ReplaySubject
一样工作:
public class RollingReplaySubject<T> : ISubject<T>
{
private readonly ReplaySubject<IObservable<T>> _subjects;
private readonly IObservable<T> _concatenatedSubjects;
private ISubject<T> _currentSubject;
public RollingReplaySubject()
{
_subjects = new ReplaySubject<IObservable<T>>(1);
_concatenatedSubjects = _subjects.Concat();
_currentSubject = new ReplaySubject<T>();
_subjects.OnNext(_currentSubject);
}
public void Clear()
{
_currentSubject.OnCompleted();
_currentSubject = new ReplaySubject<T>();
_subjects.OnNext(_currentSubject);
}
public void OnNext(T value)
{
_currentSubject.OnNext(value);
}
public void OnError(Exception error)
{
_currentSubject.OnError(error);
}
public void OnCompleted()
{
_currentSubject.OnCompleted();
_subjects.OnCompleted();
// a quick way to make the current ReplaySubject unreachable
// except to in-flight observers, and not hold up collection
_currentSubject = new Subject<T>();
}
public IDisposable Subscribe(IObserver<T> observer)
{
return _concatenatedSubjects.Subscribe(observer);
}
}
遵守通常的规则(与任何 Subject
一样)并且不要同时调用此类上的方法 - 包括 Clear()
。如果需要,您可以简单地添加同步锁。
它通过在主 ReplaySubject 中嵌套一系列 ReplaySubjects 来工作。外层 ReplaySubject (_subjects
) 恰好包含一个内部 ReplaySubject (_currentSubject
) 的缓冲区,并在构造时填充。
OnXXX
方法调用 _currentSubject
ReplaySubject。
观察者订阅嵌套 ReplaySubjects 的串联投影(保存在 _concatenatedSubjects
中)。因为 _subjects
的缓冲区大小仅为 1,所以新订阅者只会获取最近的 ReplaySubject
之后的事件。
每当我们需要“清除缓冲区”时,现有的 _currentSubject
是 OnCompleted
并且一个新的 ReplaySubject 被添加到 _subjects
并成为新的 _currentSubject
。
按照@Brandon 的建议,我创建了一个 RollingReplaySubject
版本,它使用 TimeSpan
或输入流来指示缓冲区清除。我在这里为此创建了一个要点:https://gist.github.com/james-world/c46f09f32e2d4f338b07
关于c# - 如何清除 ReplaySubject 上的缓冲区?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28945061/
我有一个外部热源在观察者可以订阅之前推送值。订阅后,迟到的观察者应该收到最新的值以及从那时起的每个值。为此,我使用了以下代码(相关行标有“ console.warn('!!!', n)); 这行不通(
如何将多个错误传递给 ReplaySubject? 当我调用 OnError 时,只有第一个异常被传递。我需要多次调用并传递所有错误/异常。 我在内部看到 RX 创建了一个 AnonymousSafe
我有一个 ReplaySubject,它使用 scan 运算符累积数据,每 10000 毫秒应重置一次。还有其他方法吗? 现在: let subject = new ReplaySubject();
我正在使用 ReactiveX/RxJS 版本。 假设我有一个 Rx.ReplaySubject,它每 2 秒发出一个包含 id 和带有值的数组的对象。我想减少这个值数组并得到它们的总和。 问题是 R
我有一个奇怪的用例,我需要跟踪所有以前发出的事件。 感谢 ReplaySubject,到目前为止它运行良好。在每个新订阅者上,此主题都会重新发出以前的每个事件。 现在,对于特定场景,我需要能够只提供最
下面是一个描述我正在尝试做的事情的片段。在我的应用程序中,我有一个贯穿始终的 replaysubject。在某个时候,我想获得主题发出的最后一个值,但是 last似乎不适用于 ReplaySubjec
我在 Angular 4 中使用的模板有问题。该模板实现了一个通知系统,您可以在其中添加新通知,但文档没有指定如何删除观察者 ReplaySubject 的元素。 模板将其实现为服务,如下所示: pr
我想用这样的加入: Observable.forkJoin( this.service1.dataSourceIsAReplaySubject, this.service2.dataS
我在我正在处理的 Silverlight 应用程序中遇到了一个绝对奇怪的行为。请看下面的代码: var replaySubject = new ReplaySubject(1); replay
根据我目前对 RxJS 中 ReplaySubject 的理解,下面的代码应该可以工作: import { ReplaySubject } from 'rxjs/ReplaySubject'; pri
我想创建一个冷可观察对象,它只会在有实际订阅时才开始执行昂贵的操作。 ReplaySubject 非常适合,除了我需要能够在实际订阅而不是创建可观察对象时启动昂贵的后台操作的部分。有办法吗?某种 on
我将状态保存在一个 ReplaySubject 中,它会重播状态的最后一个副本。从该状态,派生出其他 ReplaySubjects 来保持……好吧,派生状态。每个重放主题只需要保存它的最后计算状态/派
如何清除 ReplaySubject 上的缓冲区? 我需要定期清除缓冲区(在我的例子中作为一天结束的事件)以防止 ReplaySubject 不断增长并最终吃掉所有内存。 理想情况下,我希望保持相同的
我有一个 rxjs ReplaySubject,它会根据我所处的环境发出一个值或 null。意思是如果在某个环境中我进行服务调用并获取数据。如果我不在那个环境中,我只是在重播主题上调用 next(nu
我开发了一个具有两个 View 的组件。组件 A 有一个联系表单,组件 B 是“谢谢”页面。 组件A:您填写表格并提交。一旦响应到达,就会创建一个新的 ReplaySubject 值。用户将被路由到组
我需要一种方法来获取最近添加到符合特定条件的 ReplaySubject 的项目。下面的示例代码完成了我需要它做的事情,但感觉不是正确的方法: static void Main(string[] ar
我一直在尝试让我的流仅重播最新的值,但运气不佳。基本上,我有一个 replaySubject(1) 在我不能手动 .onNext 的地方。我想做的是接受它,将它映射到其他东西,然后添加一个初始值。 我
我有 CompositeSubscription ,并在那里添加带有 ReplaySubject 的 Subscription CompositeSubscription compositeSubs
我在 RxSwift 中有这个: func foo() -> Observable { let subject = RxSwift.ReplaySubject.create(bufferSiz
标题 如果您基本上有相同的问题并且您的上下文是 Angular,您可能需要阅读答案中的所有评论以获得更多上下文。 这个问题的简短版本 在做 let observe$ = someReplaySubje
我是一名优秀的程序员,十分优秀!