gpt4 book ai didi

c# - 为什么重复 Enumerable 到 Observable 转换 block

转载 作者:行者123 更新时间:2023-12-03 15:55:00 29 4
gpt4 key购买 nike

这是一个相当有教育意义的,出于好奇的问题。考虑以下片段:

var enumerable = Enumerable.Range(0, 5);
var observable = enumerable.ToObservable();
var enu = observable.Concat(observable).ToEnumerable();
enu.ToObservable().SubscribeDebug();
哪里 SubscribeDebug订阅一个简单的观察者:
public class DebugObserver<T> : IObserver<T>
{
public void OnCompleted()
{
Debug.WriteLine("Completed");
}

public void OnError(Exception error)
{
Debug.WriteLine("Error");
}

public void OnNext(T value)
{
Debug.WriteLine("Value: {0}", value);
}
}
这个的输出是:

Value: 0

Value: 1

Value: 2

Value: 3

Value: 4


然后块。有人可以帮助我理解它发生的根本原因以及为什么 observable 没有完成吗?我注意到它在没有 Concat 的情况下是完整的。调用,但被它阻塞。

最佳答案

我看过 the sourceToObservable并提炼出一个最小的实现。它确实重现了我们看到的行为。

    public static IObservable<T> ToObservableEx<T>(this IEnumerable<T> enumerable) =>
ToObservableEx(enumerable, CurrentThreadScheduler.Instance);

public static IObservable<T> ToObservableEx<T>(this IEnumerable<T> enumerable, IScheduler scheduler) =>
Observable.Create<T>
(
observer =>
{
IDisposable loopRec(IScheduler inner, IEnumerator<T> enumerator)
{
if (enumerator.MoveNext())
{
observer.OnNext(enumerator.Current);
inner.Schedule(enumerator, loopRec); //<-- culprit
}
else
{
observer.OnCompleted();
}

// ToObservable.cs Line 117
// We never allow the scheduled work to be cancelled.
return Disposable.Empty;
}

return scheduler.Schedule(enumerable.GetEnumerator(), loopRec);
}
);

顺便说一句 - 问题的症结在于 CurrentThreadScheduler 的行为,这是使用的默认调度程序。
CurrentThreadScheduler的行为是如果调度已经在运行 Schedule正在被调用 - 它最终被排队。
        CurrentThreadScheduler.Instance.Schedule(() =>
{
CurrentThreadScheduler.Instance.Schedule(() =>
Console.WriteLine(1)
);

Console.WriteLine(2);
});

这打印 2 1 .这种排队行为是我们的失败。

observer.OnCompleted()被调用,它导致 Concat开始下一次枚举 - 然而,事情与我们开始时不一样 - 我们仍然在 observer => { } 中当我们尝试安排下一个时阻止。因此,下一个计划不是立即执行,而是排队。

现在 enumerator.MoveNext()陷入僵局。
它无法移动到下一项 - MoveNext正在阻塞直到下一个项目到达 - 只有在 ToObservable 安排的时候才能到达环形。

但是调度器只能通知 ToEnumerable随后 MoveNext()被阻止 - 一旦退出 loopRec - 它不能,因为它被 MoveNext 阻止了首先。

附录

这大约是什么 ToEnumerable (来自 GetEnumerator.cs )不(不是有效的实现):
    public static IEnumerable<T> ToEnumerableEx<T>(this IObservable<T> observable)
{
var gate = new SemaphoreSlim(0);
var queue = new ConcurrentQueue<T>();

using(observable.Subscribe(
value => { queue.Enqueue(value); gate.Release(); },
() => gate.Release()))
while (true)
{
gate.Wait(); //this is where it blocks

if (queue.TryDequeue(out var current))
yield return current;
else
break;
}
}

Enumerables 预计会阻塞,直到产生下一个项目 - 这就是为什么有一个门控实现。不是 Enumerable.Range哪个阻塞,但是 ToEnumerable .

关于c# - 为什么重复 Enumerable 到 Observable 转换 block ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61041484/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com