gpt4 book ai didi

c# - Rx,动态合并源

转载 作者:太空狗 更新时间:2023-10-30 00:53:10 26 4
gpt4 key购买 nike

我正在寻找一种不间断地动态合并数据源的方法。现实世界的场景是从多个来源提取类似的数据,而不考虑冗余信息。

为了简化代码,我用一个可以连续生成数据的简单数字生成器替换了更复杂的代码。这可以比作从多个外部服务器读取连续的数据流。

我希望能够合并这两个源并将结果(在适当的时候)打印到控制台,这部分效果很好。当我们终止这两个源并合并到另一个源时,事情就会按预期停止工作。在这种情况下,我们可以轻松地重新连接 mergedStreamObserver,但是,在更大的应用程序中,我们将不得不关注数据中的差距以及跟踪订阅了哪些观察者。

有解决办法吗?

// imports
using System;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;

static void Main(string[] args) {
// base "stream of results" as we will want to randomly add (and terminate other sources)
IObservable<int> merged = Observable.Empty<int>();

// source 1
var tokenSource1 = new CancellationTokenSource();
IObservable<int> xs = Generate(tokenSource1, "A");

// to avoid generating the same numbers, which does happen,
// sleep some amount of time before calling generate again
Thread.Sleep(100);

// source 2
var tokenSource2 = new CancellationTokenSource();
IObservable<int> xt = Generate(tokenSource2, "B");

// odd queries
var seq1 = from n in xs where n % 2 == 1 select n;

// even queries
var seq2 = from n in xt where n % 2 == 0 select n;

// merge everything together
merged = merged.Merge<int>(seq1);
merged = merged.Merge<int>(seq2);

// observer for the merged "streams"
// NOTE: while this does not appear to be working correctly,
// remember you have 2 streams and 2 queries at work. It
// really is doing what it's expected to here.
IDisposable mergedStreamObserver = merged.Subscribe(str => { Console.WriteLine(str); });

// kill both sources
Console.ReadKey();

tokenSource1.Cancel();
tokenSource2.Cancel();

// start source and query for evens
// try to merge it
Console.ReadKey();

tokenSource2 = new CancellationTokenSource();
xt = Generate(tokenSource2, "B");

seq2 = from n in xt where n % 2 == 0 select n;

merged = merged.Merge(seq2);

// Nothing is happening because the merged stream was modified.
// How do we create a composite Observable from multiple sources
// and dynamically add/terminate those sources?

Console.ReadKey();

tokenSource2.Cancel();
mergedStreamObserver.Dispose();
Console.ReadKey();
}

static IObservable<int> Generate(CancellationTokenSource tokenSource, string name) {
Random random = new Random();

Action<int> observer = _ => { }; /* We could use null, but then at every invocation
* we'd have to copy to a local and check for null.
*/

Task.Factory.StartNew(() => {
while(!tokenSource.IsCancellationRequested) {
var t = random.Next(0, 100);
Console.WriteLine("From Generator {0}: {1}", name, t);

observer(t);

Thread.Sleep(1000);
}

Console.WriteLine("Random Generator Stopped");
}, tokenSource.Token);

return Observable.FromEvent<int>(
eh => observer += eh,
eh => observer -= eh);
}

最佳答案

使用主题,并在创建流之前订阅合并流:

var streams = new Subject<IObservable<int>>();
var mergedStreams = streams.Merge();
var mergedObserver = mergedStreams.Subscribe(...);

// now create your streams
...

// add them to the streams subject
streams.OnNext(seq1);
streams.OnNext(seq2);
...

streams.OnNext(seq3);
streams.OnNext(seq4);

...
// If we know there will be no more streams, tell the Subject...
streams.OnCompleted();

关于c# - Rx,动态合并源,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17553093/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com