gpt4 book ai didi

c# - .Net RX : tracking progress of parallel execution

转载 作者:行者123 更新时间:2023-11-30 16:26:36 25 4
gpt4 key购买 nike

我需要并行执行多个长时间运行的操作,并希望以某种方式报告进度。从我最初的研究来看,IObservable 似乎适合这个模型。这个想法是我调用一个返回 IObservable of int 的方法,其中报告 int 完成百分比,并行执行在退出方法后立即开始,这个 observable 必须是一个热 observable 以便所有订阅者在特定时间点了解相同的进度信息,例如迟到的订阅者可能只知道整个执行已经完成,没有更多的进度可以跟踪。

我发现最接近这个问题的方法是使用 Observable.ForkJoin 和 Observable.Start,但我无法理解如何使它们成为我可以从方法返回的单个可观察对象。

请分享您关于如何实现它的想法,或者也许还有另一种使用 .Net RX 解决此问题的方法。

最佳答案

要制作热可观察对象,我可能会从使用 BehaviorSubject 作为返回值和操作报告进度的方式开始。如果您只想要示例,请跳到最后。此答案的其余部分解释了这些步骤。

为了这个答案,我假设您的长时间运行的操作没有自己的异步调用方式。如果他们这样做了,下一步可能会有所不同。接下来要做的是使用 IScheduler 将工作发送到另一个线程。如果需要,您可以允许调用者通过将调度程序作为参数的重载来选择工作发生的位置(在这种情况下,不会选择默认调度程序的重载)。 IScheduler.Scheduler 有很多重载,其中有几个是扩展方法,因此您应该仔细查看它们,看看哪个最适合您的情况;我在这里使用的是只需要一个 Action 的 on。如果您有多个可以并行运行的操作,则可以多次调用 scheduler.Schedule

其中最困难的部分可能是确定在任何给定时间点的进展情况。如果同时进行多项操作,您可能需要跟踪已完成的操作数量以了解当前进度。根据您提供的信息,我不能比这更具体了。

最后,如果您的操作是可取消的,您可能需要将 CancellationToken 作为参数。您可以使用它在操作开始之前在调度程序的队列中取消操作。如果您正确编写操作代码,它也可以使用 token 进行取消。

IObservable<int> DoStuff(/*args*/, 
CancellationToken cancel,
IScheduler scheduler)
{
BehaviorSubject<int> progress;
//if you don't take it as a parameter, pick a scheduler
//IScheduler scheduler = Scheduler.ThreadPool;

var disp = scheduler.Schedule(() =>
{
//do stuff that needs to run on another thread

//report progres
porgress.OnNext(25);
});
var disp2 = scheduler.Schedule(...);

//if the operation is cancelled before the scheduler has started it,
//you need to dispose the return from the Schedule calls
var allOps = new CompositeDisposable(disp, disp2);
cancel.Register(allOps.Dispose);

return progress;
}

关于c# - .Net RX : tracking progress of parallel execution,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8693795/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com