- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我想在 Rx.NET 中实现一个信号过滤器,它从一组初始系数开始。随着时间的流逝,必须根据观察到的数据值的快照重新计算滤波器系数。
这是一个小原型(prototype),展示了它应该如何工作。为简单起见,我选择滤波器长度和用于重新计算滤波器系数的历史数据值的数量相同(示例中为 3)。
该示例使用 bufferedAt10
中的副作用来重新计算系数。这不是我想要的。
在实际应用中,数据以不规则的时间步长传入,系数应每天或每周在特定时间更新一次。我可以很容易地使缓冲区更长,但是我怎样才能让系统运行并以一种干净的功能方式从观察者那里更改滤波器系数?
// create a hot obvervable to produce data
const int bufLen = 3;
var rng = new Random();
var period = TimeSpan.FromSeconds(0.5);
var observable = Observable.Interval(period)
.Select(i => new {Time = DateTime.Now, Price = rng.NextDouble()})
.Do(e => Console.WriteLine("original : {0}", e))
.Publish();
observable.Connect();
Console.WriteLine("Press any key to subscribe");
Console.ReadKey();
// buffer of length bufLen used for filter calculation (every tick) and filter
// coefficient update (at a lower frequency)
var buffered = observable.Buffer(bufLen, 1);
// apply the signal filter with coefficients in `coeff`
var coeff = new List<Double>() {1.0, 1.0, 1.0}; // these will be updated on the way from new data
var filtered = buffered.Select(e =>
{
var f = 0.0;
for (var i = 0; i < bufLen; i++)
{
f += e[i].Price*coeff[i]; // apply the filter with coefficients `coeff`
}
return new {Time = DateTime.Now, FilteredPrice = f};
});
var s1 = filtered.Subscribe(e => Console.WriteLine("filtered : {0} (coeff {1},{2},{3})", e, coeff[0], coeff[1], coeff[2]));
// recalculate the filter coefficients say every 10 seconds
var bufferedAt10 = buffered.DistinctUntilChanged(e => (e[bufLen - 1].Time.TimeOfDay.Seconds / 10) * 10);
var s2 = bufferedAt10.Subscribe(e =>
{
Console.WriteLine("recalc of coeff : {0}", e[bufLen - 1].Time);
for (var i = 0; i < bufLen; i++)
{
// a prototypical function that takes the buffer and uses it to "recalibrate" the filter coefficients
coeff[i] = coeff[i] + e[bufLen - 1 - i].Price;
}
Console.WriteLine("updated coeffs to {0},{1},{2}", coeff[0], coeff[1], coeff[2]);
});
感谢任何好的建议。
最佳答案
以下内容未经测试,但我认为它应该涵盖您的需要。其背后的想法是,您将流发散,对一个流进行系数更新,然后使用 WithLatestFrom
将它们重新组合在一起。我使用 Sample
和 Scan
来执行周期“调整”。您的自定义时间戳可以通过使用 TimeStamp
运算符来完成。您还可以考虑将 Publish
向下移动到 Buffer
之后,否则您将有两个流生成缓冲区,但这取决于您。
const int bufLen = 3;
var rng = new Random();
var period = TimeSpan.FromSeconds(0.5);
var observable = Observable.Interval(period)
.Select( => rng.NextDouble())
.Publish();
observable.Connect();
Console.WriteLine("Press any key to subscribe");
Console.ReadKey();
var buffered = observable.Buffer(bufLen, 1);
var seed = new [] {1.0, 1.0, 1.0};
var coefficients = buffered
//Samples for a new value every 10 seconds
.Sample(TimeSpan.FromSeconds(10))
//Updates the seed value and emits it after every update
.Scan(seed,
//Use good old fashion Linq
(coeff, delta) => coeff.Zip(delta.Reverse(),
(c, d) => c + d.Price)
.ToArray()
);
//Emits a new value everytime buffer emits, and combines it with the latest
//values from the coefficients Observable
//Kick off coefficients with the seed otherwise you need to wait 10 seconds
//for the first value.
buffer.WithLatestFrom(coefficients.StartWith(seed), (e, coeff) => {
return e.Zip(coeff, (x, c) => x.Price * c).Sum();
})
.TimeStamp()
.Subscribe(e => Console.WriteLine("filtered : {0}", e);
关于c# - 使用 Rx.NET 的信号过滤器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33486397/
我用 Cocoapods 创建了一个简单的项目并安装了 RxSwift 和 RxCocoa。 我写了这段代码: import UIKit import RxSwift class ViewContro
我对 Rx 真的很陌生,只是想知道 subscribeOn 的顺序如何影响 Observable //This will not print anything Observable.just("wha
我有一个 PublishSubject 每 X 秒发出一个信号,我想只考虑 Y 秒后发出的第一个项目。 例子 observable A 每秒发出一次“滴答声” observable B 应该每 5 秒
我有以下流。 Observable.just(Unit) // execute immediately .mergeWith(tryAgainRelay) // execute again w
我可以看到 RX 适用于 Android 和 UI 事件处理。我很难看到 RX 在后端提供了什么好处。 RX Java 是为后端处理而设计的,还是这个概念太过分了? 最佳答案 实际上,RxJava 最
我有一个关于什么是更好的 RxJava 模式以保持可观察状态的问题。 为简单起见,假设我们有一个 StateManager需要跟踪系统中某个状态(假设它是一个简单的 bool 标志)并以可观察的方式公
我遇到了以下问题。我有一个从具有 isMember 属性的服务器请求的基本对象。如果该属性为 true,我想向端点 1、2 和 3 发出请求。 如果该属性为 false,我只想向端点 1 发出请求。
我正在使用 RxJava,但也许能够翻译另一个实现的答案。 我有一个 Observable 发出一系列项目,我想将它们分成 10 组。如下所示: observable .buffer(10)
有没有办法计算 RxAndroid 中流中已处理的元素数量? 我有类似的事情: Observable.fromArray(new String[]{"these", "are", "my", "val
如果我的客户端断开连接(错误)超过 10 秒,我会尝试显示一个弹出窗口。但是,当重新连接时,我也会关闭弹出窗口(true)。如果为真,我需要立即关闭弹出窗口。 我认为我需要做的是根据值(假)进行去抖,
收到 onNext() 后如何自动退订? 现在我使用这个代码: rxObservable .compose(bindToLifecycle()) // unsubscribe automaticall
我在 Ettus x310 上有一个简单的 C++ 测试程序,以前可以用,但现在不行了。我试图简单地设置单个 USRP 的两个 channel 的两个中心频率。当我尝试在第二个 channel 上设置
我有一个返回 Observable 的网络调用,我有另一个网络调用,它不是依赖于第一个 Observable 的 rx,我需要以某种方式转换这一切都与 Rx 相关。 Observable respon
我正在尝试根据 Rxjava 中的某些条件创建组列表。 以下是我的回复: { "dates":[ { "date":18, "value":
RX 6800 XT是最近很强的一款显卡,让很多的用户都很喜爱,详细还有很多没有入手的用户对他不是很了解吧,下面就带来了RX 6800 XT评测、跑分、价格、参数、图片,快来一起看看吧。 RX
我有一个重复元素列表,比如: Observable source = Observable.just("A", "B", "A", "C", "C", "A"); 我想按它们的值以及它们出现的次数对它
如果我将 RX-main 包添加到 WPF 应用程序,我会安装以下依赖包: 接收接口(interface) 接收核心 Rx-Linq Rx 平台服务 主要接收 添加 Rx-main 后,可以安装两个与
com.myapp.test.debug E/MessageQueue-JNI:rx.exceptions.OnErrorNotImplementedException 在 rx.Observable
这个问题在这里已经有了答案: RxJava operator that emits if different from last (1 个回答) 关闭 5 年前。 我有一个 bool 型 Obser
首先,我在两个 shell 中同时运行 vnstat -tr -i eth0 和 iftop。奇怪的是,vnstat 中的 rx 比 iftop 中的峰值 rx 大得多。我该如何解决这个问题? vns
我是一名优秀的程序员,十分优秀!