gpt4 book ai didi

java - 用于管理 api 每分钟最大请求数的数据结构

转载 作者:行者123 更新时间:2023-12-03 12:58:21 24 4
gpt4 key购买 nike

我需要将数据发送到外部 api,但此 API 对每个端点的请求数有限制(即:每分钟 60 个请求)。
数据来自Kafka,然后每条消息都转到redis(因为我可以发送一个包含200个项目的请求)。所以,我使用一个简单的缓存来帮助我,我可以保证如果我的服务器出现故障,我不会丢失任何消息。
问题是,有时候 Kafka 开始发送很多消息,然后 redis 开始增长(超过 100 万条消息要发送到 api),而且我们不能在消息进来时发出太快的请求. 然后,我们有一个很大的延迟。
我的第一个代码很简单:ExecutorService executor = Executors.newFixedThreadPool(1);当消息很少并且延迟最小时,这非常有效。
所以,我做的第一件事就是将执行器更改为:ExecutorService executor = Executors.newCachedThreadPool();所以我可以要求新线程,因为我需要更快地向外部 api 发出请求,但是,我遇到了每分钟请求数限制的问题。
有些端点我可以每分钟发出 300 个请求,其他 500 个,其他 30 个等等。
我做的代码不是很好,这是我工作的公司,所以,我真的需要把它做得更好。
所以,每次我要请求外部api时,我都会调用makeRequest方法,这个方法是同步的,我知道我可以使用同步列表,但我认为在这种情况下同步方法更好。

// This is an inner class
private static class IntegrationType {

final Queue<Long> requests; // This queue is used to store the timestamp of the requests
final int maxRequestsPerMinute; // How many requests I can make per minute

public IntegrationType(final int maxRequestsPerMinute) {
this.maxRequestsPerMinute = maxRequestsPerMinute;
this.requests = new LinkedList<>();
}

synchronized void makeRequest() {
final long current = System.currentTimeMillis();
requests.add(current);
if (requests.size() >= maxRequestsPerMinute) {
long first = requests.poll(); // gets the first request

// The difference between the current request and the first request of the queue
final int differenceInSeconds = (int) (current - first) / 1000;

// if the difference is less than the maximum allowed
if (differenceInSeconds <= 60) {
// seconds to sleep.
final int secondsToSleep = 60 - differenceInSeconds;
sleep(secondsToSleep);
}
}
}

void sleep( int seconds){
try {
Thread.sleep(seconds * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
那么,有一个我可以使用的数据结构吗?
我应该考虑什么?
提前致谢。

最佳答案

如果正确理解您的问题,您可以使用 BlockingQueue ScheduledExecutorService 如下。BlockingQueue s有方法put如果有可用空间,它只会在队列中添加给定元素,否则方法调用将等待(直到有可用空间)。他们也有方法take如果有任何元素,它只会从队列中删除一个元素,否则方法调用将等待(直到至少有一个元素要获取)。
具体来说,您可以使用 LinkedBlockingQueueArrayBlockingQueue可以使用固定大小的元素在任何给定时间保存。这个固定大小意味着您可以使用 put 提交尽可能多的请求,但你只会take请求并每秒处理一次或其他(例如,每分钟发出 60 个请求)。
实例化 LinkedBlockingQueue对于固定大小,只需使用相应的构造函数(接受大小作为参数)。 LinkedBlockingQueuetake根据其文档按 FIFO 顺序排列元素。
实例化 ArrayBlockingQueue对于固定大小,使用接受大小但也接受 boolean 的构造函数标志命名 fair .如果这个标志是 true那么队列将take元素也按 FIFO 顺序排列。
那么你就可以拥有一个 ScheduledExecutorService (而不是在循环中等待)您可以在其中提交单个 Runnable这将 take从队列中,与外部 API 进行通信,然后等待通信之间所需的延迟。
下面是上面的一个简单的演示示例:

import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class Main {

public static class RequestSubmitter implements Runnable {
private final BlockingQueue<Request> q;

public RequestSubmitter(final BlockingQueue<Request> q) {
this.q = Objects.requireNonNull(q);
}

@Override
public void run() {
try {
q.put(new Request()); //Will block until available capacity.
}
catch (final InterruptedException ix) {
System.err.println("Interrupted!"); //Not expected to happen under normal use.
}
}
}

public static class Request {
public void make() {
try {
//Let's simulate the communication with the external API:
TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 100));
}
catch (final InterruptedException ix) {
//Let's say here we failed to communicate with the external API...
}
}
}

public static class RequestImplementor implements Runnable {
private final BlockingQueue<Request> q;

public RequestImplementor(final BlockingQueue<Request> q) {
this.q = Objects.requireNonNull(q);
}

@Override
public void run() {
try {
q.take().make(); //Will block until there is at least one element to take.
System.out.println("Request made.");
}
catch (final InterruptedException ix) {
//Here the 'taking' from the 'q' is interrupted.
}
}
}

public static void main(final String[] args) throws InterruptedException {

/*The following initialization parameters specify that we
can communicate with the external API 60 times per 1 minute.*/
final int maxRequestsPerTime = 60;
final TimeUnit timeUnit = TimeUnit.MINUTES;
final long timeAmount = 1;

final BlockingQueue<Request> q = new ArrayBlockingQueue<>(maxRequestsPerTime, true);
//final BlockingQueue<Request> q = new LinkedBlockingQueue<>(maxRequestsPerTime);

//Submit some RequestSubmitters to the pool...
final ExecutorService pool = Executors.newFixedThreadPool(100);
for (int i = 0; i < 50_000; ++i)
pool.submit(new RequestSubmitter(q));

System.out.println("Serving...");

//Find out the period between communications with the external API:
final long delayMicroseconds = TimeUnit.MICROSECONDS.convert(timeAmount, timeUnit) / maxRequestsPerTime;
//We could do the same with NANOSECONDS for more accuracy, but that would be overkill I think.

//The most important line probably:
Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(new RequestImplementor(q), 0L, delayMicroseconds, TimeUnit.MICROSECONDS);
}
}
请注意,我使用了 scheduleWithFixedDelay而不是 scheduleAtFixedRate .您可以在他们的文档中看到第一个将等待提交 Runnable 的调用结束之间的延迟。开始下一个,而第二个不会等待,只需重新提交 Runnableperiod时间单位。但是我们不知道与外部 API 通信需要多长时间,那么如果我们 scheduleAtFixedRate 呢?与 period每分钟一次,但请求需要一分钟以上才能完成?...然后将在第一个尚未完成时提交新请求。所以这就是我使用 scheduleWithFixedDelay 的原因而不是 scheduleAtFixedRate .但还有更多:我使用了单线程调度执行器服务。是不是说第一次调用没有结束,第二次就不能开始了?... 看来是这样,你看一下 Executors#newSingleThreadScheduledExecutor()的实现,因为单线程核心池的大小可能会发生第二次调用,并不意味着池的大小是固定的。
我使用 scheduleWithFixedDelay 的另一个原因是因为请求下溢。例如队列为空怎么办?然后调度也应该等待而不是提交 Runnable再次。
另一方面,如果我们使用 scheduleWithFixedDelay ,说延迟 1/60f调度之间的秒数,一分钟内提交了60多个请求,那么这肯定会让我们对外部API的吞吐量下降,因为有了 scheduleWithFixedDelay我们可以保证最多向外部 API 发出 60 个请求。它可以比这少,但我们不希望它如此。我们希望每次都达到极限。如果这不是您关心的问题,那么您已经可以使用上述实现了。
但是,假设您确实希望每次都尽可能接近限制,在这种情况下,据我所知,您可以使用自定义调度程序来做到这一点,这比第一个解决方案更不干净,但时间更多准确的。
底线,通过上述实现,您需要确保与外部 API 的通信以尽可能快地为请求提供服务。
最后,我应该警告你考虑如果 BlockingQueue,我找不到会发生什么。我建议的实现不是 put按 FIFO 顺序排列。我的意思是,如果队列已满时 2 个请求几乎同时到达怎么办?他们都会等待,但第一个到达的人会等待并得到 put第一个,或者第二个是 put先编?我不知道。如果您不关心在外部 API 上发出的某些请求无序,那么请不要担心并使用代码到此为止。但是,如果您确实关心,并且您可以在每个请求中输入序列号,那么您可以使用 PriorityQueueBlockingQueue ,甚至可以尝试 PriorityBlockingQueue (不幸的是,这是无限的)。那会使事情变得更加复杂,所以我没有用 PriorityQueue 发布相关代码。 .至少我尽力了,我希望我能提出一些好的想法。我并不是说这篇文章是您所有问题的完整解决方案,但它是一些考虑因素。

关于java - 用于管理 api 每分钟最大请求数的数据结构,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65258130/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com