gpt4 book ai didi

c# - AsyncProducerConsumerQueue 的可观察包装器

转载 作者:行者123 更新时间:2023-11-30 22:53:07 25 4
gpt4 key购买 nike

所以我为 Stephen Cleary 的 AsyncProducerConsumerQueue<T> 创建了一个可观察的包装器使用以下代码。

我想知道这里是否有人知道我如何以更简单的方式做到这一点?

  • 是否可以在没有包装类的情况下编写?
  • 是否可以防止将多个包装器应用于一个队列而导致的错误?
  • 我可以让它在第一次订阅时连接,而不是通过直接调用 Connect 吗? ?如果是这样,这意味着什么?
  • 最后,你会怎么做?

using Nito.AsyncEx;
using System.Reactive;

static async Task ExampleUsage() {
var queue = new AsyncProducerConsumerQueue<int>();
var observable = queue.AsConnectableObservable();
await queue.EnqueueAsync(1);
observable.Subscribe(Console.WriteLine);
observable.Connect();
await queue.EnqueueAsync(2);
}

public static class AsyncExExtensions {
public static IConnectableObservable<T> AsConnectableObservable<T>(this AsyncProducerConsumerQueue<T> queue) {
return new ConnectableObservableForAsyncProducerConsumerQueue<T>(queue);
}
}

class ConnectableObservableForAsyncProducerConsumerQueue<T> : IConnectableObservable<T> {

readonly AsyncProducerConsumerQueue<T> Queue;

long _isConnected = 0;
ImmutableList<IObserver<T>> Observers = ImmutableList<IObserver<T>>.Empty;

public ConnectableObservableForAsyncProducerConsumerQueue(AsyncProducerConsumerQueue<T> queue) {
Queue = queue;
}

public IDisposable Connect() {
if (Interlocked.Exchange(ref _isConnected, 1) == 1) throw new InvalidOperationException("Observable cannot be connected more than once.");
var cts = new CancellationTokenSource();
var token = cts.Token;
Task.Run(async () => {
try {
while (true) {
token.ThrowIfCancellationRequested();
var @event = await Queue.DequeueAsync(token).ConfigureAwait(false);
foreach (var observer in Observers)
observer.OnNext(@event);
}
} catch (Exception x) when (x is OperationCanceledException || x is InvalidOperationException) {
foreach (var observer in Observers)
observer.OnCompleted();
}
});
return Disposable.Create(() => {
cts.Cancel();
cts.Dispose();
});
}

readonly object subscriberListMutex = new object();
public IDisposable Subscribe(IObserver<T> observer) {
lock (subscriberListMutex) {
Observers = Observers.Add(observer);
}
return Disposable.Create(() => {
lock (subscriberListMutex) {
Observers = Observers.Remove(observer);
}
});
}
}

最佳答案

免责声明:我不是专家,所以我可能忽略了这个答案的某些方面 - 请谨慎使用!

考虑以下两个演示。在您有多个观察者的情况下,它们的行为会有所不同。在第一个演示中,观察者将竞争队列中的项目,在第二个演示中,他们每人都会收到一份副本。

演示 #1 - 冷可观测

var queue = new AsyncProducerConsumerQueue<int>();

// This is a cold observable, so each observer is fed by its own individual dequeue loop
// and therefore will be 'competing' with other observers for queued items.
var coldObservable = Observable
// Create an observable that asynchronously waits for items to become available on the
// queue and then emits them to the observer. This will be cancelled when the observer
// is unsubscribed.
.Create<int>(async (observer, cancellationToken) =>
{
while (true)
{
var item = await queue.DequeueAsync(cancellationToken).ConfigureAwait(false);
Console.WriteLine($"Dequeued {item}");
observer.OnNext(item);
}
})
// If an InvalidOperationException is thrown by the above, continue with
// an empty observable instead of the error. This effectively catches an
// `OnError(InvalidOperationException)` and turns it into an `OnCompleted()`.
.Catch<int, InvalidOperationException>(exn =>
{
Console.WriteLine("Caught InvalidOperation");
return Observable.Empty<int>();
});

Console.WriteLine("TEST COLD");

await queue.EnqueueAsync(1);
Console.WriteLine("Enqueued 1");

Console.WriteLine("Subscribing A");
coldObservable.Subscribe(
item => Console.WriteLine($"A received {item}"),
() => Console.WriteLine("A completed"));

Console.WriteLine("Subscribing B");
coldObservable.Subscribe(
item => Console.WriteLine($"B received {item}"),
() => Console.WriteLine("B completed"));

await queue.EnqueueAsync(2);
Console.WriteLine("Enqueued 2");

await queue.EnqueueAsync(3);
Console.WriteLine("Enqueued 3");

queue.CompleteAdding();
Console.WriteLine("Completed adding");

Console.WriteLine("Waiting...");
await Task.Delay(2000);

Console.WriteLine("DONE");

// TEST COLD
// Enqueued 1
// Subscribing A
// Dequeued 1
// A received 1
// Subscribing B
// Enqueued 2
// Enqueued 3
// Completed adding
// Waiting...
// Dequeued 2
// Dequeued 3
// A received 2
// B received 3
// Caught InvalidOperation
// Caught InvalidOperation
// A completed
// B completed
// DONE

演示 #2 - 热可观察

var queue = new AsyncProducerConsumerQueue<int>();

var coldObservable = // defined same as above

// This is a hot observable, so each observer receives the same items from the queue.
var hotObservable = coldObservable
// Publish the cold observable to create an `IConnectableObservable` that will subscribe
// to the dequeue loop when connected and emit the same items to all observers.
.Publish()
// Automatically connect to the published observable when the first observer subscribes
// and automatically disconnect when the last observer unsubscribes. This means that the
// first observer will receive any items queued before it subscribes, but additional
// observers will only receive items queued after they subscribed.
.RefCount();

Console.WriteLine("TEST HOT");

await queue.EnqueueAsync(1);
Console.WriteLine("Enqueued 1");

Console.WriteLine("Subscribing A");
hotObservable.Subscribe(
item => Console.WriteLine($"A received {item}"),
() => Console.WriteLine("A completed"));

Console.WriteLine("Subscribing B");
hotObservable.Subscribe(
item => Console.WriteLine($"B received {item}"),
() => Console.WriteLine("B completed"));

await queue.EnqueueAsync(2);
Console.WriteLine("Enqueued 2");

await queue.EnqueueAsync(3);
Console.WriteLine("Enqueued 3");

queue.CompleteAdding();
Console.WriteLine("Completed adding");

Console.WriteLine("Waiting...");
await Task.Delay(2000);

Console.WriteLine("DONE");

// TEST HOT
// Enqueued 1
// Subscribing A
// Dequeued 1
// A received 1
// Subscribing B
// Enqueued 2
// Enqueued 3
// Dequeued 2
// Completed adding
// Waiting...
// A received 2
// B received 2
// Dequeued 3
// A received 3
// B received 3
// Caught InvalidOperation
// A completed
// B completed
// DONE

回答您原来的问题:

Could it have been written without a wrapper class?

是的,请参阅上面的演示。

Would it be possible to prevent errors from multiple wrappers being applied to one queue?

上面演示的方法不会阻止其他方将项目出列(或对队列执行任何其他操作)。如果你想确保只公开一个 IObservable<T>对于给定的队列,考虑通过创建一个 ObservableProducerConsumerQueue<T> 来封装队列本身在内部创建和管理自己的 AsyncProducerConsumerQueue .你可以公开一个 EnqueueAsync方法只委托(delegate)给内部队列并使用上面的演示版 observable 之一将 observable 公开为属性或实现 IObservable<T>界面。

Could I make it connect on the first subscription instead of via a direct call to Connect? If so, what are the implications of that?

Demo #2 展示了这种行为并描述了其含义。如果您希望能够在连接之前订阅观察者,请跳过 RefCount调用并使用 IConnectableObservablePublish 返回和以前一样。

Finally, how would you have done it?

如上所述,我会封装队列并公开一个 IObservable。或 IConnectableObservable使用上面演示的方法之一。

关于c# - AsyncProducerConsumerQueue 的可观察包装器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57606311/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com