gpt4 book ai didi

c# - ObserveOn 和 SubscribeOn - 工作完成的地方

转载 作者:行者123 更新时间:2023-12-02 00:42:21 25 4
gpt4 key购买 nike

基于阅读这个问题:What's the difference between SubscribeOn and ObserveOnObserveOn 设置代码在 Subscribe 处理程序中的位置被执行:stream.Subscribe(_ => { // this code here });SubscribeOn 方法设置在哪个线程上完成流的设置。
我了解到,如果未明确设置这些,则使用 TaskPool。
现在我的问题是,假设我做这样的事情:

Observable.Interval(new Timespan(0, 0, 1))
.Where(t => predicate(t))
.SelectMany(t => lots_of(t))
.ObserveOnDispatcher()
.Subscribe(t => some_action(t));
Where predicateSelectMany lots_of 在哪里执行,假设 0x251812431431 正在执行调度程序?

最佳答案

有很多关于 SubscribeOnObserveOn 的误导性信息。

概括

  • SubscribeOn 拦截调用的 IObservable<T> 单方法,该方法是Subscribe,以及由Dispose返回的IDisposable handle 调用Subscribe
  • ObserveOn 截取调用的 IObserver<T> 的方法,这是OnNextOnCompletedOnError
  • 这两种方法都会导致在指定的调度程序上进行相应的调用。

  • 分析与演示

    该声明

    ObserveOn sets where the code in the Subscribe handler is executed:



    比帮助更令人困惑。您所指的“订阅处理程序”实际上是一个 OnNext 处理程序。请记住,的 SubscribeIObservable方法接受一个 IObserverOnNextOnCompletedOnError方法,但扩展方法提供了接受lambda表达式, build IObserver实现您的方便过载。

    不过,让我适本地使用这个词;我认为“订阅处理程序”是在调用 Subscribe 时调用的 observable 中的代码。这样,上面的描述更接近于 SubscribeOn 的目的。

    订阅
    SubscribeOn 导致 observable 的 Subscribe 方法在指定的调度程序或上下文上异步执行。当您不想从正在运行的任何线程中对 observable 调用 Subscribe 方法时使用它 - 通常是因为它可以长时间运行并且您不想阻塞调用线程。

    当您调用 Subscribe 时,您正在调用一个可能是一长串 observable 的一部分的 observable。这只是 SubscribeOn 应用于它所影响的可观察量。现在可能会出现这样的情况,即链中的所有 observable 都将立即在同一个线程上订阅——但并非必须如此。以 Concat 为例——它只在前一个流完成后订阅每个连续的流,通常这将发生在前一个流调用 OnCompleted 的任何线程上。

    所以 SubscribeOn 位于您对 Subscribe 的调用和您订阅的可观察对象之间,拦截调用并使其异步。

    它还影响订阅的处理。 Subscribe 返回一个 IDisposable 句柄,用于取消订阅。 SubscribeOn 确保在提供的调度程序上调度对 Dispose 的调用。

    在试图理解 SubscribeOn 的作用时,一个常见的混淆点是 Subscribe 的 observable 处理程序很可能会调用 OnNext 、 0x251812231343214141 上的 0x2518122313432141 线程。但是,其目的不是影响这些调用。在 OnCompleted 方法返回之前完成流的情况并不少见。例如, OnError 就是这样做的。让我们来看看。

    如果使用我写的 Spy方法,运行如下代码:
    Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
    var source = Observable.Return(1).Spy("Return");
    source.Subscribe();
    Console.WriteLine("Subscribe returned");

    你得到这个输出(线程 id 当然可能会有所不同):
    Calling from Thread: 1
    Return: Observable obtained on Thread: 1
    Return: Subscribed to on Thread: 1
    Return: OnNext(1) on Thread: 1
    Return: OnCompleted() on Thread: 1
    Return: Subscription completed.
    Subscribe returned

    您可以看到整个订阅处理程序在同一个线程上运行,并在返回之前完成。

    让我们使用 Subscribe 异步运行它。我们将监视 Observable.Return observable 和 SubscribeOn observable:
    Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
    var source = Observable.Return(1).Spy("Return");
    source.SubscribeOn(Scheduler.Default).Spy("SubscribeOn").Subscribe();
    Console.WriteLine("Subscribe returned");

    这个输出(我添加的行号):
    01 Calling from Thread: 1
    02 Return: Observable obtained on Thread: 1
    03 SubscribeOn: Observable obtained on Thread: 1
    04 SubscribeOn: Subscribed to on Thread: 1
    05 SubscribeOn: Subscription completed.
    06 Subscribe returned
    07 Return: Subscribed to on Thread: 2
    08 Return: OnNext(1) on Thread: 2
    09 SubscribeOn: OnNext(1) on Thread: 2
    10 Return: OnCompleted() on Thread: 2
    11 SubscribeOn: OnCompleted() on Thread: 2
    12 Return: Subscription completed.

    01 - main 方法在线程 1 上运行。

    02 - 在调用线程上评估 Return observable。我们只是在这里得到 SubscribeOn,还没有订阅。

    03 - 在调用线程上评估 Return observable。

    04 - 现在我们终于调用了 IObservableSubscribeOn 方法。

    05 - Subscribe 方法异步完成...

    06 - ... 线程 1 返回到 main 方法。 这是 SubscribeOn 的效果!

    07 - 同时, SubscribeOn 将默认调度程序上的调用安排到 Subscribe 。这是在线程 2 上接收的。

    08 - 正如 SubscribeOn 所做的那样,它在 Return 线程上调用 Return ......

    09 - 和 OnNext 现在只是一个通过。

    10,11 - Subscribe 相同

    12 - 最后一个 SubscribeOn 订阅处理程序完成。

    希望这能澄清 OnCompleted 的目的和效果!

    观察

    如果您认为 Return作为该调用传递的 SubscribeOn方法拦截到一个不同的线程,然后 SubscribeOn做相同的工作,但对于 SubscribeObserveOnOnNext电话。

    回想一下我们最初的例子:
    Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
    var source = Observable.Return(1).Spy("Return");
    source.Subscribe();
    Console.WriteLine("Subscribe returned");

    这给出了这个输出:
    Calling from Thread: 1
    Return: Observable obtained on Thread: 1
    Return: Subscribed to on Thread: 1
    Return: OnNext(1) on Thread: 1
    Return: OnCompleted() on Thread: 1
    Return: Subscription completed.
    Subscribe returned

    现在让我们改变它以使用 OnCompleted :
    Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
    var source = Observable.Return(1).Spy("Return");
    source.ObserveOn(Scheduler.Default).Spy("ObserveOn").Subscribe();
    Console.WriteLine("Subscribe returned");

    我们得到以下输出:
    01 Calling from Thread: 1
    02 Return: Observable obtained on Thread: 1
    03 ObserveOn: Observable obtained on Thread: 1
    04 ObserveOn: Subscribed to on Thread: 1
    05 Return: Subscribed to on Thread: 1
    06 Return: OnNext(1) on Thread: 1
    07 ObserveOn: OnNext(1) on Thread: 2
    08 Return: OnCompleted() on Thread: 1
    09 Return: Subscription completed.
    10 ObserveOn: Subscription completed.
    11 Subscribe returned
    12 ObserveOn: OnCompleted() on Thread: 2

    01 - main 方法正在线程 1 上运行。

    02 - 和以前一样,在调用线程上评估 OnError observable。我们只是在这里得到 ObserveOn,还没有订阅。

    03 - Return observable 也在调用线程上进行评估。

    04 - 现在我们再次在调用线程上订阅 IObservable observable...

    05 - ... 然后将调用传递给 ObserveOn observable。

    06 - 现在 ObserveOn 在其 Return 处理程序中调用 Return

    07 - 这是 OnNext 的效果。 我们可以看到 Subscribe 在线程 2 上异步调度。

    08 - 同时 ObserveOn 在线程 1 上调用 OnNext ...

    09 - Return 的订阅处理程序完成...

    10 - 然后是 OnCompleted 的订阅处理程序...

    11 - 所以控制返回到主方法

    12 - 同时, Return 已经将 ObserveOnObserveOn 调用到线程 2。这可能在 09-11 期间的任何时间发生,因为它是异步运行的。碰巧它现在终于被调用了。

    典型的用例是什么?

    当您需要 Return 到一个长时间运行的 observable 并希望尽快离开调度程序线程时,您最常会在 GUI 中看到 OnCompleted 使用 - 也许是因为您知道它是那些在订阅中完成所有工作的 observable 之一处理程序。将它应用在 observable 链的末尾,因为这是您订阅时调用的第一个 observable。

    当您想要确保 SubscribeOnSubscribeObserveOn 调用被编码回调度程序线程时,您将最常看到 OnNext 在 GUI 中使用。将它应用在可观察链的末尾以尽可能晚地转换回来。

    希望您能看到问题的答案是 OnCompleted 不会对 OnErrorObserveOnDispatcher 执行的线程产生任何影响 - 这一切都取决于调用它们的线程流!流的订阅处理程序将在调用线程上被调用,但是在不知道 Where 是如何实现的情况下,不可能说 SelectManyWhere 将在哪里运行。

    生命周期超过 Subscribe 调用的 Observable

    到目前为止,我们一直只关注 SelectManystreamObservable.Return 处理程序中完成其流。这不是非典型的,但流比 Return 处理程序存活时间更长的情况同样常见。以 Subscribe 为例:
    Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
    var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
    source.Subscribe();
    Console.WriteLine("Subscribe returned");

    这将返回以下内容:
    Calling from Thread: 1
    Timer: Observable obtained on Thread: 1
    Timer: Subscribed to on Thread: 1
    Timer: Subscription completed.
    Subscribe returned
    Timer: OnNext(0) on Thread: 2
    Timer: OnCompleted() on Thread: 2

    您可以清楚地看到订阅完成,然后 SubscribeObservable.Timer 稍后在不同的线程上被调用。

    请注意, OnNextOnCompleted 的组合不会对哪个线程或调度程序 SubscribeOn 选择调用 0x25181223134312411 和 481312411 上的 0x25181223134312411 产生任何影响

    当然,您可以使用 ObserveOn 来确定 Timer 线程:
    Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
    var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
    source.SubscribeOn(NewThreadScheduler.Default).Spy("SubscribeOn").Subscribe();
    Console.WriteLine("Subscribe returned");

    (我在这里特意改成 OnNext 以防止在 OnCompleted 发生与 SubscribeOn 相同的线程池线程的情况下发生混淆)

    给予:
    Calling from Thread: 1
    Timer: Observable obtained on Thread: 1
    SubscribeOn: Observable obtained on Thread: 1
    SubscribeOn: Subscribed to on Thread: 1
    SubscribeOn: Subscription completed.
    Subscribe returned
    Timer: Subscribed to on Thread: 2
    Timer: Subscription completed.
    Timer: OnNext(0) on Thread: 3
    SubscribeOn: OnNext(0) on Thread: 3
    Timer: OnCompleted() on Thread: 3
    SubscribeOn: OnCompleted() on Thread: 3

    在这里,您可以清楚地看到线程 (1) 上的主线程在其 Subscribe 调用后返回,但是 NewThreadScheduler 订阅获得了自己的线程 (2),但是 Timer 和 0x2518131431 上的 0x2518131431 调用了。

    现在对于 SubscribeOn ,让我们将代码更改为(对于以下代码,请使用 nuget 包 rx-wpf):
    var dispatcher = Dispatcher.CurrentDispatcher;
    Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
    var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
    source.ObserveOnDispatcher().Spy("ObserveOn").Subscribe();
    Console.WriteLine("Subscribe returned");

    这段代码有点不同。第一行确保我们有一个调度程序,我们还引入了 Subscribe - 这就像 Timer ,除了它指定我们应该使用 OnNext 的任何线程 0x2518431421 评估

    此代码提供以下输出:
    Calling from Thread: 1
    Timer: Observable obtained on Thread: 1
    ObserveOn: Observable obtained on Thread: 1
    ObserveOn: Subscribed to on Thread: 1
    Timer: Subscribed to on Thread: 1
    Timer: Subscription completed.
    ObserveOn: Subscription completed.
    Subscribe returned
    Timer: OnNext(0) on Thread: 2
    ObserveOn: OnNext(0) on Thread: 1
    Timer: OnCompleted() on Thread: 2
    ObserveOn: OnCompleted() on Thread: 1

    请注意,调度程序(和主线程)是线程 1。 OnCompleted 仍在其选择的线程(2)上调用 ObserveOnObserveOnDispatcher - 但 0x25181314131313131313

    另请注意,如果我们要阻塞调度程序线程(例如 ObserveOn ),您会看到 DispatcherScheduler 会阻塞(此代码在 LINQPad 主方法中效果最佳):
    var dispatcher = Dispatcher.CurrentDispatcher;
    Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
    var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
    source.ObserveOnDispatcher().Spy("ObserveOn").Subscribe();
    Console.WriteLine("Subscribe returned");
    Console.WriteLine("Blocking the dispatcher");
    Thread.Sleep(2000);
    Console.WriteLine("Unblocked");

    你会看到这样的输出:
    Calling from Thread: 1
    Timer: Observable obtained on Thread: 1
    ObserveOn: Observable obtained on Thread: 1
    ObserveOn: Subscribed to on Thread: 1
    Timer: Subscribed to on Thread: 1
    Timer: Subscription completed.
    ObserveOn: Subscription completed.
    Subscribe returned
    Blocking the dispatcher
    Timer: OnNext(0) on Thread: 2
    Timer: OnCompleted() on Thread: 2
    Unblocked
    ObserveOn: OnNext(0) on Thread: 1
    ObserveOn: OnCompleted() on Thread: 1

    通过 ObserveOnDispatcher 的调用只有在 Timer 运行后才能退出。

    关键点

    记住 Reactive Extensions 本质上是一个自由线程库是很有用的,并且试图尽可能地懒惰它运行在哪个线程上 - 你必须故意干扰 OnNextOnCompleted 并将特定的调度程序传递给接受它们的运算符(operator)改变这一点。

    observable 的使用者无法控制它在内部执行的操作 - ObserveOnDispatcherThread.Sleepdecorators,它们将观察者的表面区域和 observables 调用包装到 marshal。希望这些例子已经说明了这一点。

    关于c# - ObserveOn 和 SubscribeOn - 工作完成的地方,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20451939/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com