gpt4 book ai didi

c# - 如何通过对原始序列的值运行任务来创建 Rx 序列?

转载 作者:太空狗 更新时间:2023-10-29 23:32:03 25 4
gpt4 key购买 nike

我有一个 IObservable<T> 类型的序列和映射 T, CancellationToken 的函数到 Task<U> .获得 IObservable<U> 的最干净的方法是什么?在他们之外?

我需要以下语义:

  • 每项任务在前一项任务完成后开始
  • 如果任务已被取消或出错,则跳过
  • 严格保留原始序列的顺序

这是我看到的签名:

public static IObservable<U> Select<T, U> (
this IObservable<T> source,
Func<T, CancellationToken, Task<U>> selector
);

我还没有写过任何代码,但我会写,除非有人比我先写。
无论如何,我不熟悉 Window 这样的运算符,所以我的解决方案可能不太优雅。

我需要 C# 4 中的解决方案,但为了比较,也欢迎使用 C# 5 的答案。


如果你很好奇,下面是我的真实场景,或多或少:

Dropbox.GetImagesRecursively ()
.ObserveOn (SynchronizationContext.Current)
.Select (DownloadImage)
.Subscribe (AddImageToFilePicker);

最佳答案

到目前为止,这似乎对我有用:

public static IObservable<U> Select<T, U> (
this IObservable<T> source,
Func<T, CancellationToken, Task<U>> selector)
{
return source
.Select (item =>
Observable.Defer (() =>
Observable.StartAsync (ct => selector (item, ct))
.Catch (Observable.Empty<U> ())
))
.Concat ();
}

我们将一个延迟的基于任务的异常吞噬可观察对象映射到每个项目,然后连接它们。


我的思考过程是这样的。

我注意到其中一个 SelectMany 重载几乎完全符合我的要求,甚至具有完全相同的签名。但是它不能满足我的需求:

  • 它会在原始项目出现时创建任务,而我需要等待每个任务完成
  • 它不提供跳过已取消和错误任务的选项

我查看了这个重载的实现并注意到它使用 FromAsync 来处理任务创建和取消:

public virtual IObservable<TResult> SelectMany<TSource, TTaskResult, TResult> (IObservable<TSource> source, Func<TSource, CancellationToken, Task<TTaskResult>> taskSelector, Func<TSource, TTaskResult, TResult> resultSelector)
{
return SelectMany_<TSource, TTaskResult, TResult> (
source,
x => FromAsync (ct => taskSelector (x, ct)),
resultSelector
);
}

我将目光转向 FromAsync 以了解它是如何实现的,并且惊喜地发现它也是可组合的:

public virtual IObservable<TResult> FromAsync<TResult> (Func<CancellationToken, Task<TResult>> functionAsync)
{
return Defer (() => StartAsync (functionAsync));
}

我重用了 DeferStartAsync,同时还添加了 Catch 来处理错误。 DeferConcat 的组合确保任务相互等待并按原始顺序启动。

关于c# - 如何通过对原始序列的值运行任务来创建 Rx 序列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17616671/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com