gpt4 book ai didi

system.reactive - RX 故障的解决方法?

转载 作者:行者123 更新时间:2023-12-04 06:53:38 26 4
gpt4 key购买 nike

我正在各种平台上试验 Reactive Extensions,让我有点恼火的一件事是故障。

即使对于 UI 代码,这些故障可能 not be that problematic ,通常可以找到一个可以解决它们的运算符,但我仍然发现在出现故障的情况下调试代码更加困难:中间结果对调试并不重要,但我不知道结果何时是中间或“最终” .

在 Haskell 和同步数据流系统中使用纯功能 FRP 进行了一些工作,它也“感觉”错误,但这当然是主观的。

但是当将 RX 连接到非 UI 执行器(如电机或开关)时,我认为故障会更成问题。如何确保只有正确的值被发送到外部执行器?

也许这可以通过一些“调度程序”来解决,该调度程序知道某个“外部传感器”何时触发了启动事件,以便在将最终结果转发给执行器之前处理所有内部事件。类似于 flapjax paper 中描述的内容.

我希望得到答案的问题是:

  • RX 中是否有某些东西无法修复同步通知的故障?
  • 如果没有,是否存在用于修复同步故障的 RX(最好是生产质量的)库或方法?特别是对于单线程 Javascript 这可能有意义吗?
  • 如果不存在通用解决方案,那么如何使用 RX 来控制外部传感器/执行器而不会在执行器上出现故障?

  • 让我举个例子吧

    假设我想打印一系列元组 (a,b)契约(Contract)在哪里
    a=n    b=10 * floor(n/10)

    n 是自然数流 = 0,1,2....

    所以我期待以下顺序
    (a=0, b=0)
    (a=1, b=0)
    (a=2, b=0)
    ...
    (a=9, b=0)
    (a=10, b=10)
    (a=11, b=10)
    ...

    在 RX 中,为了让事情更有趣,我将使用过滤器来计算 b 流
    var n = Observable
    .Interval(TimeSpan.FromSeconds(1))
    .Publish()
    .RefCount();

    var a = n.Select(t => "a=" + t);

    var b = n.Where(t => t % 10 == 0)
    .Select(t => "b=" + t);

    var ab = a.CombineLatest(b, Tuple.Create);

    ab.Subscribe(Console.WriteLine);

    这给出了我认为是一个小故障(暂时违反不变量/契约(Contract)):
    (a=0, b=0)
    (a=1, b=0)
    (a=2, b=0)
    ...
    (a=10, b=0) <-- glitch?
    (a=10, b=10)
    (a=11, b=10)

    我意识到这是 CombineLatest 的正确行为,但我也认为这被称为故障,因为在真正的纯 FRP 系统中,您不会得到这些违反中间不变性的结果。

    请注意,在此示例中,我将无法使用 Zip,而且 WithLatestFrom 也会给出不正确的结果。

    当然,我可以将这个例子简化为一个 monadic 计算,从不多播 n 个流事件(这意味着不能过滤而只能映射),但这不是重点:IMO 在 RX 中你总是得到一个“小故障” ' 每当您拆分并重新加入可观察流时:
        s
    / \
    a b
    \ /
    t

    例如,在 FlapJAX 中你不会遇到这些问题。

    这是否有意义?

    非常感谢,
    彼得

    最佳答案

    更新:让我尝试在 RX 上下文中回答我自己的问题。

    首先,我对“故障”的理解似乎是错误的。从纯 FRP 的角度来看,在我看来 RX 中的小故障,在 RX 中实际上似乎是正确的行为。

    所以我想在 RX 中,我们需要明确我们期望驱动传感器组合值的“时间”。

    在我自己的例子中,执行器是控制台,传感器是间隔 n .

    所以如果我改变我的代码

    ab.Subscribe(Console.WriteLine);

    进入
    ab.Sample(n).Subscribe(Console.WriteLine);

    然后只打印“正确”的值。

    这确实意味着,当我们得到一个组合来自传感器的值的可观察序列时,我们必须知道所有原始传感器,将它们全部合并,并在将任何值发送到执行器之前用合并的信号对值进行采样......

    因此,另一种方法是将 IObservable “提升”为“感知”结构,该结构可以记住并合并原始传感器,例如:
    public struct Sensed<T>
    {
    public IObservable<T> Values;
    public IObservable<Unit> Sensors;

    public Sensed(IObservable<T> values, IObservable<Unit> sensors)
    {
    Values = values;
    Sensors = sensors;
    }

    public IObservable<Unit> MergeSensors(IObservable<Unit> sensors)
    {
    return sensors == Sensors ? Sensors : Sensors.Merge(sensors);
    }

    public IObservable<T> MergeValues(IObservable<T> values)
    {
    return values == Values ? Values : Values.Merge(values);
    }
    }

    然后我们必须将所有 RX 方法转移到这个“感知”结构:
    public static class Sensed
    {
    public static Sensed<T> Sensor<T>(this IObservable<T> source)
    {
    var hotSource = source.Publish().RefCount();
    return new Sensed<T>(hotSource, hotSource.Select(_ => Unit.Default));
    }

    public static Sensed<long> Interval(TimeSpan period)
    {
    return Observable.Interval(period).Sensor();
    }

    public static Sensed<TOut> Lift<TIn, TOut>(this Sensed<TIn> source, Func<IObservable<TIn>, IObservable<TOut>> lifter)
    {
    return new Sensed<TOut>(lifter(source.Values), source.Sensors);
    }

    public static Sensed<TOut> Select<TIn, TOut>(this Sensed<TIn> source, Func<TIn, TOut> func)
    {
    return source.Lift(values => values.Select(func));
    }

    public static Sensed<T> Where<T>(this Sensed<T> source, Func<T, bool> func)
    {
    return source.Lift(values => values.Where(func));
    }

    public static Sensed<T> Merge<T>(this Sensed<T> source1, Sensed<T> source2)
    {
    return new Sensed<T>(source1.MergeValues(source2.Values), source1.MergeSensors(source2.Sensors));
    }

    public static Sensed<TOut> CombineLatest<TIn1, TIn2, TOut>(this Sensed<TIn1> source1, Sensed<TIn2> source2, Func<TIn1, TIn2, TOut> func)
    {
    return new Sensed<TOut>(source1.Values.CombineLatest(source2.Values, func), source1.MergeSensors(source2.Sensors));
    }

    public static IDisposable Actuate<T>(this Sensed<T> source, Action<T> next)
    {
    return source.Values.Sample(source.Sensors).Subscribe(next);
    }
    }

    我的例子然后变成:
    var n = Sensed.Interval(TimeSpan.FromMilliseconds(100));
    var a = n.Select(t => "a=" + t);
    var b = n.Where(t => t % 10 == 0).Select(t => "b=" + t);
    var ab = a.CombineLatest(b, Tuple.Create);
    ab.Actuate(Console.WriteLine);

    同样,只有“期望的”值会传递给执行器,但通过这种设计,原始传感器会在 Sensed 结构中记住。

    我不确定这些是否“有意义”(双关语),也许我应该放弃对纯 FRP 的渴望,并接受它。毕竟,时间是相对的 ;-)

    彼得

    关于system.reactive - RX 故障的解决方法?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35465791/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com