gpt4 book ai didi

c# - Observable.FromAsync+Repeat+TakeWhile 的组合创建无限循环

转载 作者:行者123 更新时间:2023-11-30 15:58:08 25 4
gpt4 key购买 nike

有人可以解释为什么即使到达流的末尾,下面的 AsObservable 方法也会创建一个无限循环吗?

public static class StreamExt {
public static IObservable<byte> AsObservable(this Stream stream, int bufferSize) {
return Observable
.FromAsync(cancel => stream.ReadBytes(bufferSize, cancel))
.Repeat()
.TakeWhile(bytes => bytes != null) // EndOfStream
.SelectMany(bytes => bytes);
}

private static async Task<byte[]> ReadBytes(this Stream stream, int bufferSize, CancellationToken cancel) {
var buf = new byte[bufferSize];
var bytesRead = await stream
.ReadAsync(buf, 0, bufferSize, cancel)
.ConfigureAwait(false);

if (bytesRead < 1) return null; // EndOfStream
var result_size = Math.Min(bytesRead, bufferSize);
Array.Resize(ref buf, result_size);
return buf;
}
}

快速测试表明它会产生无限循环:

class Program {
static void Main(string[] args) {
using (var stream = new MemoryStream(new byte[] { 1, 2, 3 })) {
var testResult = stream
.AsObservable(1024)
.ToEnumerable()
.ToArray();
Console.WriteLine(testResult.Length);
}
}
}

当然我可以添加一个 .SubscribeOn(TaskPoolScheduler.Default) 但是,无限循环仍然存在(阻止任务池调度程序 + 从 Stream 无限读取) .

[更新 2017-05-09]

Shlomo 发布了一个更好的示例来重现此问题:

int i = 0;
var testResult = Observable.FromAsync(() => Task.FromResult(i++))
.Repeat()
.TakeWhile(l => l < 3);
testResult.Subscribe(b => Console.WriteLine(b), e => { }, () => Console.WriteLine("OnCompleted"));
Console.WriteLine("This is never printed.");

最佳答案

对于最终来到这里并需要答案的人,而不仅仅是解释:问题似乎是 FromAsync 的默认调度程序,如 this self-answered question 所示.如果您调整到“当前线程”调度程序 Repeat().TakeWhile(...) 的行为更可预测。例如。 (问题摘录):

.FromAsync(cancel => stream.ReadBytes(bufferSize, cancel), 
System.Reactive.Concurrency.Scheduler.CurrentThread)
.Repeat()
.TakeWhile(bytes => bytes != null) // EndOfStream

关于c# - Observable.FromAsync+Repeat+TakeWhile 的组合创建无限循环,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43865407/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com