gpt4 book ai didi

c# - 为我的Observable扩展方法选择“默认”调度程序

转载 作者:太空宇宙 更新时间:2023-11-03 20:10:00 25 4
gpt4 key购买 nike

(我看过this问题,但是...)

我正在尝试找出如何为我的Observable扩展方法选择默认调度程序。

第一个示例:如果我不使用Scheduler.Default离开当前线程,则此代码将卡在“生产”代码中:

public static IObservable<T> ResponsiveSample<T>(this IObservable<T> src, 
TimeSpan interval, IScheduler scheduler = null)
{
scheduler = scheduler ?? Scheduler.Default;
return src.Publish(xs => xs.Take(1).Concat(xs.Sample(interval, scheduler)));
}


第二个示例(从 here窃取)。这在当前线程上是可以的。

public static IObservable<T> RetryAfterDelay<T>(this IObservable<T> source, 
TimeSpan dueTime, IScheduler scheduler = null)
{
return RepeateInfinite(source, dueTime, scheduler).Catch();
}

private static IEnumerable<IObservable<T>> RepeateInfinite<T>(IObservable<T> src,
TimeSpan dueTime, IScheduler scheduler = null)
{
yield return source; // Don't delay the first time

scheduler = scheduler ?? Scheduler.CurrentThread;
while(true)
yield return source.DelaySubscription(dueTime, scheduler);
}


问题:是否有一条经验法则可以帮助我预测哪个默认调度程序可能导致最少的调用代码痛苦?

最佳答案

摘要

通常,如果您的操作员安排将来的事件(例如,在适当的时间致电Schedule或等待其他事件响应)或它们反复产生事件,则应该提供Scheduler重载。

如果源流本质上是阻塞的,那实际上就是源流的业务。是否解决阻塞取决于您的扩展名-但是,在没有提供选项(通过调度程序参数)来避免这种情况的情况下,请勿将非阻塞流转换为阻塞流。

细节

您引用的问题是一个很好的起点。回顾一下,您可以看到该框架将其运算符分为5类:


AsyncConversions
ConstantTimeOperations
迭代
尾递归
基于时间的操作


但是,我认为对这些类别的调整使它们在讨论中更加有用:


产生未来事件(例如缓冲区,窗口,延迟)
迭代(例如范围,生成,重复)
其他


产生未来事件

Rx库提供的属于此类的运算符将始终允许您指定调度程序。如果不这样做,他们将使用Scheduler.Default,它将使用始终具有非阻塞语义的平台默认调度程序。从本质上讲,这些操作员将在将来某个时间安排事件。明确地说,到将来,我的意思是他们将通过在将来某个时间发送事件和/或通过调用Schedule指定到期时间来响应传入的事件。

因此,您几乎永远不会希望它们被阻塞,因此使用在不同线程(或与之等效的平台)上运行的调度程序是有意义的。

迭代

迭代运算符是那些会产生大量事件的运算符,这些事件可能会或可能不会在将来安排。例如,Range运算符安排它的事件以立即分发。但是,将整个范围立即推送到观察者的幼稚的实现方式将是有问题的-由于生成某个范围需要花费一些非零的时间,因此您不想删除将范围调度到某些指定的调度程序的选项,特别是在大范围的情况下。此外,您还需要特别注意如何将事件推送到调度程序上-如果您在一个调度动作中将整个范围转储到单个循环中,则可能会不公平地剥夺对调度程序的访问权限,从而对使用该命令的时间紧迫的调度动作造成不良后果调度程序(尤其是单线程)。

因此,实现了像Range这样的迭代运算符,以便每个迭代负责安排下一个迭代。这意味着它们的行为可能会因所使用的调度程序而有很大不同-使用立即调度程序,您将看到阻塞,而使用调度程序调度程序,您将不会看到阻塞或饥饿,因为事件将与其他调度操作交错。

其他类别

剩下的类别是针对那些操作员立即转换或产生事件的-因此它们本身是非阻塞的。这些运算符通常不会提供重载来允许您指定调度程序-它们只会使用被调用的线程来产生事件-这些事件在整个生命周期中仍可能会变化,并取决于所应用的流。如果它们以恒定时间(例如Observable.Return)发起事件,则通常会在Subscribe调用返回之前完成。

如何决定

因此,要回答您的问题,您确实需要考虑整个运营商链以做出适当的决定。链中的每个链接都可能引入将来的或迭代的调度。此外,IObservable<T>不能揭示单个操作员的行为-
给定的运算符是否包括带有计划参数的重载是一个很好的线索(对于内置运算符来说是非常可靠的),但不能保证。

guidelines (an essential read)中有很好的建议,可以帮助您决定是使用还是提供调度程序重载。以下是相关摘录:


  5.4。考虑将特定的调度程序传递给并发引入运算符
  与其使用ObserveOn运算符更改可观察序列在其上生成消息的执行上下文,不如在合适的位置开始并发。当操作员通过提供调度程序参数重载来对并发引入进行参数化时,传递正确的调度程序将减少使用ObserveOn运算符的位置。
  
  [...]
  
  何时忽略本指南
  当合并源自不同执行上下文的多个事件时,请使用准则5.5将所有消息尽可能晚地放在特定的执行上下文上。
  
  5.5。尽可能晚在尽可能少的地方调用ObserveOn运算符
  通过使用ObserveOn运算符,可以为通过原始可观察序列而来的每条消息安排一个操作。这可能会更改时序信息,并给系统带来额外的压力。稍后在查询中放置此运算符将减少这两个问题。
  
  [...]
  
  何时忽略本指南
  如果您对可观察序列的使用未绑定到特定的执行上下文,请忽略此准则。在这种情况下,请勿使用ObserveOn运算符。
  
  6.12。避免引入并发
  通过增加并发性,我们可以更改可观察序列的及时性。邮件将安排在以后到达。传递消息所需的时间是数据本身,通过增加并发性,我们使数据倾斜。
  
  [...]
  
  何时忽略本指南
  在引入并发是操作员所做工作的重要部分的情况下,请忽略此准则。
  注意:当我们使用即时计划程序或直接在对Subscribe的调用中调用观察者时,我们将使Subscribe呼叫阻塞。在这种情况下,任何昂贵的计算都将表明引入并发的可能性。
  
  6.14。运营商不应该阻止
  Rx是一个库,用于使用可观察的集合来组成异步和基于事件的程序。
  通过阻止运算符,我们将失去这些异步特性。我们还可能会失去可组合性(例如,通过返回键入为T而不是IObservable的值)。


根据经验,当且仅当我在实现中使用调度程序时,我才会在重载中提供调度程序选项-因为我直接使用一个调度程序,或者因为我调用了具有调度程序重载的运算符。我总是使用调度程序重载(如果可用)。这样一来,我就可以通过TestScheduler(在nuget pack rx-testing中)进行单元测试。

总是值得使用类似我的Spy method之类的方法来确定Subscribe调用是否立即返回并且观察者的OnXXX方法是在其他线程上调用的,还是运算符是否在调用时以恒定的时间立即完成运行事件线程并几乎立即从Subscribe调用返回(如Return中所示)-如果是这种情况,则给定的配置是非阻塞的,并且不需要Scheduler.Default

对于您的特定示例,源流是关键因素。

在两个示例中,您都可以通过提供当前的线程调度程序来进行阻止。但是,是否阻塞取决于源流的Subscribe是阻塞的还是非阻塞的,以及它调度的事件是什么调度程序。

它还取决于您“粘着”的意思-如果您仅在谈论Subscribe粘着,那么只需要检查(从最后应用的最后一个运算符开始向后应用)是否在非阻塞之前出现了阻塞Subscribe一。如果您正在谈论导致阻塞的中间事件,则需要考虑这些操作员的工作方式:

通常,如果您的操作员安排将来的事件(例如,在适当的时间致电Schedule或等待其他事件响应)或它们反复产生事件,则应该提供Scheduler重载。

如果源流本质上是阻塞的,那实际上就是源流的业务。是否解决阻塞取决于您的扩展名-但是,在没有提供选项(通过调度程序参数)来避免这种情况的情况下,请勿将非阻塞流转换为阻塞流。

关于c# - 为我的Observable扩展方法选择“默认”调度程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20627228/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com