gpt4 book ai didi

c# - 以最大速率处理请求

转载 作者:行者123 更新时间:2023-11-30 12:24:52 25 4
gpt4 key购买 nike

我使用 Rx 来确保我们的后端遵守某些第三方 API 的请求限制。

下面的实现使用了一个简单的 Subject<T>作为输入队列,然后使用 James World's custom Pace operator 驯服 .

这有效,但只要throttledRequestsObserveOn(TaskPoolScheduler.Default) 强制执行的主线程上未观察到.

一旦我注释掉这一行(第 61 行),程序的行为就好像 Pace根本没有使用运算符,并且请求在排队时以最快的速度再次得到处理。谁能解释这种行为?

using System;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication1
{
public static class ObservableExtensions
{
/// <summary>
/// James World's Pace operater (see https://stackoverflow.com/a/21589238/88513)
/// </summary>
public static IObservable<T> Pace<T>(this IObservable<T> source, TimeSpan interval)
{
return source.Select(i => Observable.Empty<T>()
.Delay(interval)
.StartWith(i))
.Concat();
}
}

class Program
{
ISubject<int> requests;
IObservable<int> throttledRequests;

private Task<T> QueueRequest<T>(int work, Func<int, Task<T>> doWork)
{
var task = throttledRequests
.Where(x => x == work)
.Take(1)
.SelectMany(doWork)
.ToTask();

// queue it
requests.OnNext(work);

return task;
}

private Task<int> DoRequest(int x)
{
Console.WriteLine("{0:T}: DoRequest({1}) on TID {2}", DateTime.UtcNow, x, Thread.CurrentThread.ManagedThreadId);
return Task.FromResult(x);
}

private void Run()
{
// initialize request queue
requests = new Subject<int>();

// create a derived rate-limited queue
throttledRequests = requests
.Pace(TimeSpan.FromMilliseconds(1000))
.Publish()
.RefCount()
.ObserveOn(TaskPoolScheduler.Default);

Console.WriteLine("Main TID: {0}", Thread.CurrentThread.ManagedThreadId);

int i = 0;

while (true)
{
// Queue a number of requests
var tasks = Enumerable.Range(i * 10, 10)
.Select(x => QueueRequest(x, DoRequest))
.ToArray();

Task.WaitAll(tasks);

Console.ReadLine();
i++;
}
}

static void Main(string[] args)
{
new Program().Run();
}
}
}

最佳答案

我无法完整回答问题(不确定为什么它在 ThreadPoolScheduler 上运行时运行)但我会告诉您我的想法并展示如何修复它以在有或没有 ThreadPoolScheduler 的情况下按预期运行。

首先,您可能会注意到,即使在 ThreadPoolScheduler 上,它也无法正常工作 - 通常前 1-3 个项目会在没有任何延迟的情况下得到处理。为什么在那之后他们开始延迟处理我仍然不清楚。现在说说原因。考虑以下示例代码:

var result = Observable.Range(0, 10).Delay(TimeSpan.FromSeconds(10)).StartWith(1).Take(1).ToTask().Result;

在这里,不会有任何延迟,任务会立即完成。为什么?因为 StartWith 会立即在序列的开头注入(inject)“1”,然后 Take(1) 获取该值并完成 - 没有理由继续序列,因此永远不会执行延迟。例如,如果您改用 Take(2) - 它会在完成前延迟 10 秒。

出于完全相同的原因,您的代码永远不会进入延迟(例如,您可以通过在延迟后选择并记录到控制台来使用调试器验证这一点)。要修复,只需删除 Take(1)(或例如将其更改为 Take(2))——无论如何,每个键始终只有一项。当您这样做时,无论有没有 ThreadPoolScheduler,代码都会正确运行。

关于c# - 以最大速率处理请求,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32690875/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com