gpt4 book ai didi

c# - 围绕一组不断变化的依赖可观察对象创建一个可观察对象

转载 作者:行者123 更新时间:2023-11-30 22:00:17 26 4
gpt4 key购买 nike

下面的代码片段是我尝试创建以下功能:

  • 创建一个订阅主题集合的可观察序列
  • 当集合中的一个主题产生一个值时,序列结束,调用一个方法返回一组新的主题并从 1 重新开始。
  • 当对外部可观察对象的订阅被处理掉时,整个事情就停止了

关于我的实现的问题:

  • 为什么它可以使用 subjectsSub.SelectMany(x => x).Merge() 而不是使用 subjectsSub.Merge()? (我本来希望后一种方法起作用)
  • 在庞大的 Rx 功能库中是否有更简单、更优雅的解决方案?

更新:这个示例实际上是从 RxJS-Typescript 向后移植的,目的是让更广泛的受众了解这个问题。原始版本在使用 Javascript 的单线程浏览器环境中运行的事实应该更清楚为什么这种“可观察的捕获”可能有效(它确实有效并且没有像弄乱 RxJs 内部结构这样的肮脏黑客)。


    class Program
{
private static readonly Queue<IObservable<Unit>[]> observableDependencies = new Queue<IObservable<Unit>[]>();

private static IObservable<Unit>[] EvaluateExpressionAndCaptureTouchedObservables(Func<object> expression)
{
// wire some traps for capturing any observables "touched" by expression
expression();

// return observables touched by expression (not in this example of course)
if (observableDependencies.Count > 0)
return observableDependencies.Dequeue();

return new[] {Observable.Never<Unit>()}; // keep going
}

private static IObservable<Unit> CreateObservable(
Subject<IObservable<Unit>[]> capturedObservables, Stopwatch sw)
{
return Observable.Create<Unit>(observer =>
{
var isComplete = new Subject<Unit>();
var isAborted = false;

var disp = Scheduler.Default.Schedule(self =>
{
Console.WriteLine("** Next iteration at {0}", sw.Elapsed);

capturedObservables.SelectMany(x => x).Merge().TakeUntil(isComplete).Subscribe(x =>
{
observer.OnNext(Unit.Default);

// self-destruct
isComplete.OnNext(Unit.Default);
},
() =>
{
Console.WriteLine("completed");

if (!isAborted)
self();
});

capturedObservables.OnNext(EvaluateExpressionAndCaptureTouchedObservables());
});

return new CompositeDisposable(Disposable.Create(() =>
{
isAborted = true;

// self-destruct
isComplete.OnNext(Unit.Default);
}), disp);
});
}

private static void Main(string[] args)
{
var sw = new Stopwatch();
sw.Start();

observableDependencies.Enqueue(new[]
{
Observable.Timer(TimeSpan.FromSeconds(10)).Select(x => Unit.Default)
});

observableDependencies.Enqueue(new[]
{
Observable.Timer(TimeSpan.FromSeconds(5)).Select(x => Unit.Default),
Observable.Return(10).Select(x => Unit.Default)
});

observableDependencies.Enqueue(new[] {Observable.Timer(TimeSpan.FromSeconds(3)).Select(x => Unit.Default)});

var capturedObservables = new Subject<IObservable<Unit>[]>();
var obs = CreateObservable(capturedObservables, sw);

var disp = obs.Subscribe(x => Console.WriteLine("** fired at {0}", sw.Elapsed));
Console.ReadLine();

disp.Dispose();
Console.ReadLine();
}
}

最佳答案

要回答您的第一个问题,SelectMany 是必需的,因为您有一个 3 级深度可观察对象:Observable 数组的主题。 Merge 仅展平一层。 SelectMany 只是Select + Merge 的简写。所以 SelectMany.Merge 正在应用 2 个展平运算符,这正是您所需要的。

第二个答案...看起来你可以只使用 Merge + FirstOrDefault + Defer + Repeat甚至不使用主题:

var disp = Observable
.Defer(() => EvaluateExpressionAndCaptureTouchedObservables()
.Merge()
.FirstOrDefault(Unit.Default))
.Repeat()
.Subscribe(...);

Defer 每次订阅时运行捕获函数

Merge 以展平 observables 数组

FirstOrDefault 在任何可观察对象产生值后立即结束流。如果它们全部完成但没有产生值,那么它会产生一个您可以观察到的 Unit.Default

Repeat 在结束时重新订阅(由于 FirstOrDefault),这会触发另一次捕获(由于 Defer)。

转换回 TypeScript 显然很简单...

关于c# - 围绕一组不断变化的依赖可观察对象创建一个可观察对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28691268/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com