gpt4 book ai didi

c# - 转换异步方法以返回 IObservable<>

转载 作者:太空狗 更新时间:2023-10-29 18:20:18 24 4
gpt4 key购买 nike

我有一个 async 方法,它是一个长时间运行的方法,它读取一个流,并在它发现某些东西时触发一个事件:

public static async void GetStream(int id, CancellationToken token)

它需要一个取消 token ,因为它是在新任务中创建的。它在读取流时在内部调用 await:

var result = await sr.ReadLineAsync()

现在,我想将其转换为返回 IObservable<> 的方法,以便我可以将其与响应式扩展一起使用。从我读到的内容来看,最好的方法是使用 Observable.Create,并且由于 RX 2.0 现在也支持异步,所以我可以像这样使用它:

public static IObservable<Message> ObservableStream(int id, CancellationToken token)
{
return Observable.Create<Message>(
async (IObserver<Message> observer) =>
{

内部的其余代码是相同的,但我调用的不是触发事件而是 observer.OnNext()。但是,这感觉不对。一方面,我在其中混合了 CancellationTokens,尽管添加 async 关键字使其起作用,但这实际上是最好的做法吗?我这样调用我的 ObservableStream:

Client.ObservableStream(555404, token).ObserveOn(Dispatcher.CurrentDispatcher).SubscribeOn(TaskPoolScheduler.Default).Subscribe(m => Messages.Add(m));

最佳答案

你是对的。一旦您通过 IObservable 表示您的接口(interface),您应该避免要求调用者提供 CancellationToken。这并不意味着您不能在内部使用它们。 Rx 提供了多种机制来生成 CancellationToken 实例,当观察者取消订阅您的可观察对象时,这些实例将被取消。

有多种方法可以解决您的问题。最简单的方法几乎不需要更改代码。它使用 Observable.Create 的重载,它为您提供一个 CancellationToken,如果调用者取消订阅则触发:

public static IObservable<Message> ObservableStream(int id)
{
return Observable.Create<Message>(async (observer, token) =>
{
// no exception handling required. If this method throws,
// Rx will catch it and call observer.OnError() for us.
using (var stream = /*...open your stream...*/)
{
string msg;
while ((msg = await stream.ReadLineAsync()) != null)
{
if (token.IsCancellationRequested) { return; }
observer.OnNext(msg);
}
observer.OnCompleted();
}
});
}

关于c# - 转换异步方法以返回 IObservable<>,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17465680/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com