gpt4 book ai didi

c# - HttpClient 请求限制器和缓冲区的实现

转载 作者:行者123 更新时间:2023-12-05 05:43:19 26 4
gpt4 key购买 nike

在我们的项目中,我们有一些服务使用 key 向第 3 方 API 发出请求。

这个 API 在所有端点之间有一个共享的速率限制(意味着对一个端点的请求需要 2 秒的冷却时间,然后我们才能使用不同的端点)。

我们使用定时后台作业处理了这个问题,任何时候只向其中一个端点发出请求。

经过一些架构的重新设计,我们已经到了一个我们不太依赖定时后台作业的地方,现在所有的 HttpRequests 都不能被调节,因为多个服务实例正在向 API 发出请求。

因此,在我们当前的示例中:

我们为所有需要的 API 端点设置了几个 HttpClient,即:

   services.AddHttpClient<Endpoint1Service>(client =>
{
client.BaseAddress = new Uri(configOptions.Services.Endpoint1.Url);
});

services.AddHttpClient<Endpoint2Service>(client =>
{
client.BaseAddress = new Uri(configOptions.Services.Endpoint2.Url);
});

Endpoint1Service 和 Endpoint2Service 在被后台作业服务访问之前:

    public async Task DoJob()
{
var items = await _repository.GetItems();

foreach (var item in items)
{
var processedResult = await _endpoint1Service.DoRequest(item);

await Task.Delay(2000);

//...
}

// save all results
}

但是现在这些“端点”服务是并发访问的,每次都会创建一个新实例,因此无法降低请求率。

一种可能的解决方案是创建某种单例请求缓冲区,将其注入(inject)到使用此 API 的所有服务中,并调节这些请求以给定的速率发出。我看到的问题是将请求存储在内存缓冲区中似乎很危险,以防出现问题。

这是我应该关注的方向,还是我可以尝试的其他方向?

最佳答案

希望对您有所帮助:我为类似的场景创建了以下内容。它的目标是并发节流多线程。然而,它也为您提供了一个方便的请求处理管道包装器。此外,它还提供了每个客户端的最大并发请求数限制(如果你想使用它的话)。

为每个端点服务创建一个实例,如果您希望限制为 1,则将其线程数设置为 1。如果您希望对给定端点的 4 个并发请求,请将其设置为 4。

https://github.com/tcwicks/ChillX/blob/master/src/ChillX.Threading/APIProcessor/AsyncThreadedWorkItemProcessor.cs

https://github.com/tcwicks/ChillX/blob/master/src/ChillX.Threading/APIProcessor/ThreadedWorkItemProcessor.cs

这两种实现是可以互换的。如果在 Web 服务器上下文中使用,前者可能更好,因为如果使用前台线程,它会卸载到后台线程池。

示例用法在您的情况下,如果您希望在 1 个并发请求时限制它的速率,则可能将 _maxWorkerThreads 设置为 1 的值。如果您想将其速率限制为 4 个并发请求,请将其设置为 4。

//Example Usage for WebAPI controller
class Example
{
private static ThreadedWorkItemProcessor<DummyRequest, DummyResponse, int, WorkItemPriority> ThreadedProcessorExample = new ThreadedWorkItemProcessor<DummyRequest, DummyResponse, int, WorkItemPriority>(
_maxWorkItemLimitPerClient: 100 // Maximum number of concurrent requests in the processing queue per client. Set to int.MaxValue to disable concurrent request caps
, _maxWorkerThreads: 16 // Maximum number of threads to scale upto
, _threadStartupPerWorkItems: 4 // Consider starting a new processing thread ever X requests
, _threadStartupMinQueueSize: 4 // Do NOT start a new processing thread if work item queue is below this size
, _idleWorkerThreadExitSeconds: 10 // Idle threads will exit after X seconds
, _abandonedResponseExpirySeconds: 60 // Expire processed work items after X seconds (Maybe the client terminated or the web request thread died)
, _processRequestMethod: ProcessRequestMethod // Your Do Work method for processing the request
, _logErrorMethod: Handler_LogError
, _logMessageMethod: Handler_LogMessage
);

public async Task<DummyResponse> GetResponse([FromBody] DummyRequest _request)
{
int clientID = 1; //Replace with the client ID from your authentication mechanism if using per client request caps. Otherwise just hardcode to maybe 0 or whatever
WorkItemPriority _priority;
_priority = WorkItemPriority.Medium; //Assign the priority based on whatever prioritization rules.
int RequestID = ThreadedProcessorExample.ScheduleWorkItem(_priority, _request, clientID);
if (RequestID < 0)
{
//Client has exceeded maximum number of concurrent requests or Application Pool is shutting down
//return a suitable error message here
return new DummyResponse() { ErrorMessage = @"Maximum number of concurrent requests exceeded or service is restarting. Please retry request later." };
}

//If you need the result (Like in a webapi controller) then do this
//Otherwise if it is say a backend processing sink where there is no client waiting for a response then we are done here. just return.

KeyValuePair<bool, ThreadWorkItem<DummyRequest, DummyResponse, int>> workItemResult;

workItemResult = await ThreadedProcessorExample.TryGetProcessedWorkItemAsync(RequestID,
_timeoutMS: 1000, //Timeout of 1 second
_taskWaitType: ThreadProcessorAsyncTaskWaitType.Delay_Specific,
_delayMS: 10);
if (!workItemResult.Key)
{
//Processing timeout or Application Pool is shutting down
//return a suitable error message here
return new DummyResponse() { ErrorMessage = @"Internal system timeout or service is restarting. Please retry request later." };
}
return workItemResult.Value.Response;
}

public static DummyResponse ProcessRequestMethod(DummyRequest request)
{
// Process the request and return the response
return new DummyResponse() { orderID = request.orderID };
}
public static void Handler_LogError(Exception ex)
{
//Log unhandled exception here
}

public static void Handler_LogMessage(string Message)
{
//Log message here
}
}

关于c# - HttpClient 请求限制器和缓冲区的实现,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71872023/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com