gpt4 book ai didi

c# - 有条件地组合两个 Rx 流

转载 作者:太空宇宙 更新时间:2023-11-03 12:51:32 26 4
gpt4 key购买 nike

我正在尝试使用 Rx 实现一个场景,其中我有两个热 Observable。流 1 和流 2。基于流 1 的数据,我需要启动流 2 或停止流 2。然后使用 CombineLatest 将两个流数据合并为一个。下面是我能够想出的代码。

  1. 有没有更好的方法来实现它?

  2. 我怎样才能使它更通用,就像我将拥有 Stream 1 然后是 Stream 2 .. n 对于来自 2 .. n 的每个流都有条件 Condition 2 .. n 它利用 Stream 1 的数据来检查其他流是否需要启动,然后以 CombineLatest 方式合并所有数据

代码:

        IDisposable TfsDisposable = null;

// stream 1
var hotObs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1));


// stream 2
var hotObs2 = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1)).Publish();


var observerHot = hotObs.Do(a =>
{
// Based on Condition to start the second stream
if (ConditionToStartStream2)
{
TfsDisposable = TfsDisposable ?? hotObs2.Connect();
}
})
.Do(a =>
{
// Based on condition 2 stop the second stream
if (ConditionToStopStream2)
{
TfsDisposable?.Dispose();
TfsDisposable = null;
}
}).Publish();


// Merge both the stream using Combine Latest
var finalMergedData = hotObs.CombineLatest(hotObs2, (a, b) => { return string.Format("{0}, {1}", a, b); });

// Display the result
finalMergedData.Subscribe(a => { Console.WriteLine("result: {0}", a); });

// Start the first hot observable
observerHot.Connect();

最佳答案

试一试:

var hotObs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1.0));
var hotObs2 = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(0.3));

var query =
hotObs2.Publish(h2s =>
hotObs.Publish(hs =>
hs
.Select(a => a % 7 == 0 ? h2s : Observable.Empty<long>())
.Switch()
.Merge(hs)));

这会获取两个可观察对象并使用在 lambda 中发布它们的重载来发布它们。它使它们在 lambda 范围内变热,并避免了管理对 .Connect() 的调用的需要。 .

然后我只是执行条件检查(在本例中是 a 偶数)然后返回另一个流,如果不返回空流。

然后 .Switch转动 IObservable<IObservable<long>>进入 IObservable<long>仅从最新的内部可观察值中获取值。

最后它与原来的 hs 合并了流。

使用上面的示例代码,我得到以下输出:

0 1 2 3 1 2 3 4 5 6 7 23 24 25 8 

关于c# - 有条件地组合两个 Rx 流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35361436/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com