gpt4 book ai didi

c# - 使用 Rx 为 webservice 调用创建一个轮询请求

转载 作者:行者123 更新时间:2023-11-30 19:55:59 27 4
gpt4 key购买 nike

在 C# 中使用 Rx 我正在尝试创建对 REST API 的轮询请求。我面临的问题是,Observable 需要按顺序发送响应。意味着如果请求 A 在 X 时间发出,请求 B 在 X + dx 时间发出,并且 B 的响应在 A 之前发出,则 Observable 表达式应该忽略或取消请求 A。

我已经编写了一个示例代码来描述该场景。我该如何修复它以仅获取最新响应并取消或忽略以前的响应。

 class Program
{
static int i = 0;

static void Main(string[] args)
{
GenerateObservableSequence();

Console.ReadLine();
}

private static void GenerateObservableSequence()
{
var timerData = Observable.Timer(TimeSpan.Zero,
TimeSpan.FromSeconds(1));

var asyncCall = Observable.FromAsync<int>(() =>
{
TaskCompletionSource<int> t = new TaskCompletionSource<int>();
i++;

int k = i;
var rndNo = new Random().Next(3, 10);
Task.Delay(TimeSpan.FromSeconds(rndNo)).ContinueWith(r => { t.SetResult(k); });
return t.Task;
});

var obs = from t in timerData
from data in asyncCall
select data;

var hot = obs.Publish();
hot.Connect();

hot.Subscribe(j =>
{
Console.WriteLine("{0}", j);
});
}
}

@Enigmativity 回答后:添加了 Polling Aync 功能以始终获取最新响应:

 public static IObservable<T> PollingAync<T> (Func<Task<T>> AsyncCall, double TimerDuration)
{
return Observable
.Create<T>(o =>
{
var z = 0L;
return
Observable
.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(TimerDuration))
.SelectMany(nr =>
Observable.FromAsync<T>(AsyncCall),
(nr, obj) => new { nr, obj})
.Do(res => z = Math.Max(z, res.nr))
.Where(res => res.nr >= z)
.Select(res => res.obj)
.Subscribe(o);
});

}

最佳答案

这是一种常见的情况,可以简单地解决。

有问题的示例代码的关键部分是

var obs = from t in timerData
from data in asyncCall
select data;

这可以理解为“对于 timerData 中的每个值,获取 asyncCall 中的所有值”。这是 SelectMany(或 FlatMap)运算符。 SelectMany 运算符将从内部序列 (asyncCall) 中获取所有值,并在收到时返回它们的值。这意味着您可以获得乱序值。

您想要的是在外部序列 (timerData) 产生新值时取消先前的内部序列。为此,我们想改用 Switch 运算符。

var obs = timerData.Select(_=>asyncCall)
.Switch();

完整的代码可以被清理为以下内容。 (删除了多余的发布/连接,按键处理订阅)

类(class)计划 { 静态整数 i = 0;

    static void Main(string[] args)
{
using (GenerateObservableSequence().Subscribe(x => Console.WriteLine(x)))
{
Console.ReadLine();
}
}

private static IObservable<int> GenerateObservableSequence()
{
var timerData = Observable.Timer(TimeSpan.Zero,
TimeSpan.FromSeconds(1));

var asyncCall = Observable.FromAsync<int>(() =>
{
TaskCompletionSource<int> t = new TaskCompletionSource<int>();
i++;

int k = i;
var rndNo = new Random().Next(3, 10);
Task.Delay(TimeSpan.FromSeconds(rndNo)).ContinueWith(r => { t.SetResult(k); });
return t.Task;
});

return from t in timerData
from data in asyncCall
select data;
}
}

--编辑--

看来我误解了这个问题。而@Enigmativity 提供了更准确的答案。这是对他的回答的清理。

//Probably should be a field?
var rnd = new Random();
var obs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
//.Select(n => new { n, r = ++i })
//No need for the `i` counter. Rx does this for us with this overload of `Select`
.Select((val, idx) => new { Value = val, Index = idx})
.SelectMany(nr =>
Observable.Timer(TimeSpan.FromSeconds(rnd.Next(3, 10))),
(nr, _) => nr)
//.Do(nr => z = Math.Max(z, nr.n))
//.Where(nr => nr.n >= z)
//Replace external State and Do with scan and Distinct
.Scan(new { Value = 0L, Index = -1 }, (prev, cur) => {
return cur.Index > prev.Index
? cur
: prev;
})
.DistinctUntilChanged()
.Select(nr => nr.Value)
.Dump();

关于c# - 使用 Rx 为 webservice 调用创建一个轮询请求,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35286412/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com