gpt4 book ai didi

c# - 使用 Rx.NET 的信号过滤器

转载 作者:太空宇宙 更新时间:2023-11-03 12:57:15 27 4
gpt4 key购买 nike

我想在 Rx.NET 中实现一个信号过滤器,它从一组初始系数开始。随着时间的流逝,必须根据观察到的数据值的快照重新计算滤波器系数。

这是一个小原型(prototype),展示了它应该如何工作。为简单起见,我选择滤波器长度和用于重新计算滤波器系数的历史数据值的数量相同(示例中为 3)。

该示例使用 bufferedAt10 中的副作用来重新计算系数。这不是我想要的。

在实际应用中,数据以不规则的时间步长传入,系数应每天或每周在特定时间更新一次。我可以很容易地使缓冲区更长,但是我怎样才能让系统运行并以一种干净的功能方式从观察者那里更改滤波器系数?

       // create a hot obvervable to produce data
const int bufLen = 3;
var rng = new Random();
var period = TimeSpan.FromSeconds(0.5);
var observable = Observable.Interval(period)
.Select(i => new {Time = DateTime.Now, Price = rng.NextDouble()})
.Do(e => Console.WriteLine("original : {0}", e))
.Publish();
observable.Connect();

Console.WriteLine("Press any key to subscribe");
Console.ReadKey();

// buffer of length bufLen used for filter calculation (every tick) and filter
// coefficient update (at a lower frequency)
var buffered = observable.Buffer(bufLen, 1);

// apply the signal filter with coefficients in `coeff`
var coeff = new List<Double>() {1.0, 1.0, 1.0}; // these will be updated on the way from new data
var filtered = buffered.Select(e =>
{
var f = 0.0;
for (var i = 0; i < bufLen; i++)
{
f += e[i].Price*coeff[i]; // apply the filter with coefficients `coeff`
}
return new {Time = DateTime.Now, FilteredPrice = f};
});

var s1 = filtered.Subscribe(e => Console.WriteLine("filtered : {0} (coeff {1},{2},{3})", e, coeff[0], coeff[1], coeff[2]));

// recalculate the filter coefficients say every 10 seconds
var bufferedAt10 = buffered.DistinctUntilChanged(e => (e[bufLen - 1].Time.TimeOfDay.Seconds / 10) * 10);

var s2 = bufferedAt10.Subscribe(e =>
{
Console.WriteLine("recalc of coeff : {0}", e[bufLen - 1].Time);
for (var i = 0; i < bufLen; i++)
{
// a prototypical function that takes the buffer and uses it to "recalibrate" the filter coefficients
coeff[i] = coeff[i] + e[bufLen - 1 - i].Price;
}
Console.WriteLine("updated coeffs to {0},{1},{2}", coeff[0], coeff[1], coeff[2]);
});

感谢任何好的建议。

最佳答案

以下内容未经测试,但我认为它应该涵盖您的需要。其背后的想法是,您将流发散,对一个流进行系数更新,然后使用 WithLatestFrom 将它们重新组合在一起。我使用 SampleScan 来执行周期“调整”。您的自定义时间戳可以通过使用 TimeStamp 运算符来完成。您还可以考虑将 Publish 向下移动到 Buffer 之后,否则您将有两个流生成缓冲区,但这取决于您。

const int bufLen = 3;
var rng = new Random();
var period = TimeSpan.FromSeconds(0.5);
var observable = Observable.Interval(period)
.Select( => rng.NextDouble())
.Publish();
observable.Connect();

Console.WriteLine("Press any key to subscribe");
Console.ReadKey();

var buffered = observable.Buffer(bufLen, 1);

var seed = new [] {1.0, 1.0, 1.0};

var coefficients = buffered
//Samples for a new value every 10 seconds
.Sample(TimeSpan.FromSeconds(10))
//Updates the seed value and emits it after every update
.Scan(seed,
//Use good old fashion Linq
(coeff, delta) => coeff.Zip(delta.Reverse(),
(c, d) => c + d.Price)
.ToArray()
);

//Emits a new value everytime buffer emits, and combines it with the latest
//values from the coefficients Observable
//Kick off coefficients with the seed otherwise you need to wait 10 seconds
//for the first value.
buffer.WithLatestFrom(coefficients.StartWith(seed), (e, coeff) => {
return e.Zip(coeff, (x, c) => x.Price * c).Sum();
})
.TimeStamp()
.Subscribe(e => Console.WriteLine("filtered : {0}", e);

关于c# - 使用 Rx.NET 的信号过滤器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33486397/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com