gpt4 book ai didi

c# - 使用 Rx 阻塞(并可能超时)异步操作

转载 作者:太空狗 更新时间:2023-10-29 17:39:38 24 4
gpt4 key购买 nike

我正在尝试使用 Reactive Extensions for .NET 重写一些代码,但我需要一些关于如何实现我的目标的指导。

我有一个类在低级库中封装了一些异步行为。想一想读取或写入网络的东西。当类(class)启动时,它会尝试连接到环境,成功后会通过从工作线程调用来发回信号。

我想将这种异步行为转变为同步调用,我在下面创建了一个大大简化的示例来说明如何实现:

ManualResetEvent readyEvent = new ManualResetEvent(false);

public void Start(TimeSpan timeout) {
// Simulate a background process
ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
// Wait for startup to complete.
if (!this.readyEvent.WaitOne(timeout))
throw new TimeoutException();
}

void AsyncStart(TimeSpan delay) {
Thread.Sleep(delay); // Simulate startup delay.
this.readyEvent.Set();
}

在工作线程上运行 AsyncStart 只是模拟库的异步行为的一种方式,并不是我的真实代码的一部分,低级库提供线程并调用我的代码回调。

请注意,如果启动未在超时间隔内完成,Start 方法将抛出一个 TimeoutException

我想重写这段代码以使用 Rx。这是我的第一次尝试:

Subject<Unit> readySubject = new Subject<Unit>();

public void Start(TimeSpan timeout) {
ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
// Point A - see below
this.readySubject.Timeout(timeout).First();
}

void AsyncStart(TimeSpan delay) {
Thread.Sleep(delay);
this.readySubject.OnNext(new Unit());
}

这是一次不错的尝试,但不幸的是它包含了竞争条件。如果启动完成(例如,如果delay 为 0)并且如果在 A 点有额外的延迟,则 OnNext 将在 readySubjectFirst 执行之前。本质上,我正在应用 TimeoutFirstIObservable 从未看到启动已完成,并且 TimeoutException 将是而是抛出。

Observable.Defer 似乎就是为了处理这样的问题而创建的。这是使用 Rx 的稍微复杂的尝试:

Subject<Unit> readySubject = new Subject<Unit>();

void Start(TimeSpan timeout) {
var ready = Observable.Defer(() => {
ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
// Point B - see below
return this.readySubject.AsObservable();
});
ready.Timeout(timeout).First();
}

void AsyncStart(TimeSpan delay) {
Thread.Sleep(delay);
this.readySubject.OnNext(new Unit());
}

现在异步操作不会立即开始,只有在使用 IObservable 时才会开始。不幸的是,仍然存在竞争条件,但这次是在 B 点。如果异步操作在 Defer lambda 返回之前开始调用 OnNext,它仍然会丢失并且出现 TimeoutException 将被 Timeout 抛出。

我知道我可以使用像 Replay 这样的操作符来缓冲事件,但我最初的例子没有使用 Rx 没有使用任何类型的缓冲。有没有办法让我使用 Rx 来解决我的问题而没有竞争条件?本质上只有在 IObservable 连接到 TimeoutFirst?

后才开始异步操作

根据 Ana Betts 的回答,这里是可行的解决方案:

void Start(TimeSpan timeout) {
var readySubject = new AsyncSubject<Unit>();
ThreadPool.QueueUserWorkItem(_ => AsyncStart(readySubject, TimeSpan.FromSeconds(1)));
// Point C - see below
readySubject.Timeout(timeout).First();
}

void AsyncStart(ISubject<Unit> readySubject, TimeSpan delay) {
Thread.Sleep(delay);
readySubject.OnNext(new Unit());
readySubject.OnCompleted();
}

有趣的是,当 C 点的延迟比 AsyncStart 完成所需的时间长时。 AsyncSubject 保留最后发送的通知,TimeoutFirst 仍将按预期执行。

最佳答案

所以,关于 Rx,我想很多人一开始都知道一件事(包括我自己!):如果您使用任何传统的线程函数,如 ResetEvents、Thread.Sleeps 或其他任何东西,那么您就是在做错误 (tm) - 这就像在 LINQ 中将内容转换为数组,因为您知道基础类型恰好是数组。

要知道的关键是异步函数由返回 IObservable<TResult> 的函数表示。 - 这是让您在某事完成时发出信号的神奇调味料。所以这里是你如何“Rx-ify”一个更传统的异步函数,就像你在 Silverlight 网络服务中看到的那样:

IObservable<byte[]> readFromNetwork()
{
var ret = new AsyncSubject();
// Here's a traditional async function that you provide a callback to
asyncReaderFunc(theFile, buffer => {
ret.OnNext(buffer);
ret.OnCompleted();
});

return ret;
}

This is a decent attempt but unfortunately it contains a race condition.

这是AsyncSubject的地方进来 - 这确保即使 asyncReaderFunc 击败了订阅,AsyncSubject 仍然会“重播”发生的事情。

所以,现在我们已经有了我们的函数,我们可以对它做很多有趣的事情:

// Make it into a sync function
byte[] results = readFromNetwork().First();

// Keep reading blocks one at a time until we run out
readFromNetwork().Repeat().TakeUntil(x => x == null || x.Length == 0).Subscribe(bytes => {
Console.WriteLine("Read {0} bytes in chunk", bytes.Length);
})

// Read the entire stream and get notified when the whole deal is finished
readFromNetwork()
.Repeat().TakeUntil(x => x == null || x.Length == 0)
.Aggregate(new MemoryStream(), (ms, bytes) => ms.Write(bytes))
.Subscribe(ms => {
Console.WriteLine("Got {0} bytes in total", ms.ToArray().Length);
});

// Or just get the entire thing as a MemoryStream and wait for it
var memoryStream = readFromNetwork()
.Repeat().TakeUntil(x => x == null || x.Length == 0)
.Aggregate(new MemoryStream(), (ms, bytes) => ms.Write(bytes))
.First();

关于c# - 使用 Rx 阻塞(并可能超时)异步操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/4715850/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com