gpt4 book ai didi

c# - 如何加入多个IObservable序列?

转载 作者:太空狗 更新时间:2023-10-29 18:33:06 26 4
gpt4 key购买 nike

        var a = Observable.Range(0, 10);
var b = Observable.Range(5, 10);
var zip = a.Zip(b, (x, y) => x + "-" + y);
zip.Subscribe(Console.WriteLine);

打印
0 - 5
1 - 6
2 - 7
...

相反,我想加入相同的值
5 - 5
6 - 6
7 - 7
8 - 8
...

这是合并 100 个有序异步序列问题的简化示例。加入两个 IEnumerable 非常容易,但我找不到在 Rx 中做这样的事情的方法。有什么想法吗?

更多关于输入和我正在努力实现的目标。基本上,整个系统是一个实时管道,具有多个状态机(聚合器、缓冲区、平滑过滤器等),通过 fork-join 模式连接。 RX 是否适合实现这些东西?每个输入都可以表示为

public struct DataPoint
{
public double Value;
public DateTimeOffset Timestamp;
}

每个输入数据位在到达时都带有时间戳,因此所有事件都自然地按其连接键(时间戳)排序。当事件通过管道时,它们会被 fork 和加入。连接需要通过时间戳关联并按预定义的顺序应用。例如,join(a,b,c,d) => join(join(join(a,b),c),d).

编辑以下是我可以匆忙想出的。希望有一个基于现有 Rx 运算符的更简单的解决方案。

static void Test()
{
var a = Observable.Range(0, 10);
var b = Observable.Range(5, 10);
//var zip = a.Zip(b, (x, y) => x + "-" + y);
//zip.Subscribe(Console.WriteLine);

var joined = MergeJoin(a,b, (x,y) => x + "-" + y);
joined.Subscribe(Console.WriteLine);
}

static IObservable<string> MergeJoin(IObservable<int> left, IObservable<int> right, Func<int, int, string> selector)
{
return Observable.CreateWithDisposable<string>(o =>
{
Queue<int> a = new Queue<int>();
Queue<int> b = new Queue<int>();
object gate = new object();

left.Subscribe(x =>
{
lock (gate)
{
if (a.Count == 0 || a.Peek() < x)
a.Enqueue(x);

while (a.Count != 0 && b.Count != 0)
{
if (a.Peek() == b.Peek())
{
o.OnNext(selector(a.Dequeue(), b.Dequeue()));
}
else if (a.Peek() < b.Peek())
{
a.Dequeue();
}
else
{
b.Dequeue();
}
}
}
});

right.Subscribe(x =>
{
lock (gate)
{
if (b.Count == 0 || b.Peek() < x)
b.Enqueue(x);

while (a.Count != 0 && b.Count != 0)
{
if (a.Peek() == b.Peek())
{
o.OnNext(selector(a.Dequeue(), b.Dequeue()));
}
else if (a.Peek() < b.Peek())
{
a.Dequeue();
}
else
{
b.Dequeue();
}
}
}
});

return Disposable.Empty;
});

最佳答案

GroupBy可以做你需要的。似乎您对项目何时“加入”没有时间限制,您只需要相似的项目以某种方式组合在一起。

Observable.Merge(Observable.Range(1, 10), Observable.Range(5, 15))
.GroupBy(k => k)
.Subscribe( go => go.Count().Where(cnt => cnt > 1)
.Subscribe(cnt =>
Console.WriteLine("Key {0} has {1} matches", go.Key, cnt)));

关于上述两点需要注意的是,Merge 具有以下重载,因此您对拥有数百个连接流的请求不会出现问题:

Merge<TSource>(params IObservable<TSource>[] sources);
Merge<TSource>(this IEnumerable<IObservable<TSource>> sources);
Merge<TSource>(this IObservable<IObservable<TSource>> source);

此外,GroupBy返回 IObservable<IGroupedObservable<TKey, TSource>>这意味着您可以对每个组以及每个组的每个新成员的加入使用react - 无需等到全部完成。

关于c# - 如何加入多个IObservable序列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/4911465/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com