- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我需要将数据发送到外部 api,但此 API 对每个端点的请求数有限制(即:每分钟 60 个请求)。
数据来自Kafka,然后每条消息都转到redis(因为我可以发送一个包含200个项目的请求)。所以,我使用一个简单的缓存来帮助我,我可以保证如果我的服务器出现故障,我不会丢失任何消息。
问题是,有时候 Kafka 开始发送很多消息,然后 redis 开始增长(超过 100 万条消息要发送到 api),而且我们不能在消息进来时发出太快的请求. 然后,我们有一个很大的延迟。
我的第一个代码很简单:ExecutorService executor = Executors.newFixedThreadPool(1);
当消息很少并且延迟最小时,这非常有效。
所以,我做的第一件事就是将执行器更改为:ExecutorService executor = Executors.newCachedThreadPool();
所以我可以要求新线程,因为我需要更快地向外部 api 发出请求,但是,我遇到了每分钟请求数限制的问题。
有些端点我可以每分钟发出 300 个请求,其他 500 个,其他 30 个等等。
我做的代码不是很好,这是我工作的公司,所以,我真的需要把它做得更好。
所以,每次我要请求外部api时,我都会调用makeRequest方法,这个方法是同步的,我知道我可以使用同步列表,但我认为在这种情况下同步方法更好。
// This is an inner class
private static class IntegrationType {
final Queue<Long> requests; // This queue is used to store the timestamp of the requests
final int maxRequestsPerMinute; // How many requests I can make per minute
public IntegrationType(final int maxRequestsPerMinute) {
this.maxRequestsPerMinute = maxRequestsPerMinute;
this.requests = new LinkedList<>();
}
synchronized void makeRequest() {
final long current = System.currentTimeMillis();
requests.add(current);
if (requests.size() >= maxRequestsPerMinute) {
long first = requests.poll(); // gets the first request
// The difference between the current request and the first request of the queue
final int differenceInSeconds = (int) (current - first) / 1000;
// if the difference is less than the maximum allowed
if (differenceInSeconds <= 60) {
// seconds to sleep.
final int secondsToSleep = 60 - differenceInSeconds;
sleep(secondsToSleep);
}
}
}
void sleep( int seconds){
try {
Thread.sleep(seconds * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
那么,有一个我可以使用的数据结构吗?
最佳答案
如果正确理解您的问题,您可以使用 BlockingQueue
与 ScheduledExecutorService
如下。BlockingQueue
s有方法put
如果有可用空间,它只会在队列中添加给定元素,否则方法调用将等待(直到有可用空间)。他们也有方法take
如果有任何元素,它只会从队列中删除一个元素,否则方法调用将等待(直到至少有一个元素要获取)。
具体来说,您可以使用 LinkedBlockingQueue
或 ArrayBlockingQueue
可以使用固定大小的元素在任何给定时间保存。这个固定大小意味着您可以使用 put
提交尽可能多的请求,但你只会take
请求并每秒处理一次或其他(例如,每分钟发出 60 个请求)。
实例化 LinkedBlockingQueue
对于固定大小,只需使用相应的构造函数(接受大小作为参数)。 LinkedBlockingQueue
将 take
根据其文档按 FIFO 顺序排列元素。
实例化 ArrayBlockingQueue
对于固定大小,使用接受大小但也接受 boolean
的构造函数标志命名 fair
.如果这个标志是 true
那么队列将take
元素也按 FIFO 顺序排列。
那么你就可以拥有一个 ScheduledExecutorService
(而不是在循环中等待)您可以在其中提交单个 Runnable
这将 take
从队列中,与外部 API 进行通信,然后等待通信之间所需的延迟。
下面是上面的一个简单的演示示例:
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class Main {
public static class RequestSubmitter implements Runnable {
private final BlockingQueue<Request> q;
public RequestSubmitter(final BlockingQueue<Request> q) {
this.q = Objects.requireNonNull(q);
}
@Override
public void run() {
try {
q.put(new Request()); //Will block until available capacity.
}
catch (final InterruptedException ix) {
System.err.println("Interrupted!"); //Not expected to happen under normal use.
}
}
}
public static class Request {
public void make() {
try {
//Let's simulate the communication with the external API:
TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 100));
}
catch (final InterruptedException ix) {
//Let's say here we failed to communicate with the external API...
}
}
}
public static class RequestImplementor implements Runnable {
private final BlockingQueue<Request> q;
public RequestImplementor(final BlockingQueue<Request> q) {
this.q = Objects.requireNonNull(q);
}
@Override
public void run() {
try {
q.take().make(); //Will block until there is at least one element to take.
System.out.println("Request made.");
}
catch (final InterruptedException ix) {
//Here the 'taking' from the 'q' is interrupted.
}
}
}
public static void main(final String[] args) throws InterruptedException {
/*The following initialization parameters specify that we
can communicate with the external API 60 times per 1 minute.*/
final int maxRequestsPerTime = 60;
final TimeUnit timeUnit = TimeUnit.MINUTES;
final long timeAmount = 1;
final BlockingQueue<Request> q = new ArrayBlockingQueue<>(maxRequestsPerTime, true);
//final BlockingQueue<Request> q = new LinkedBlockingQueue<>(maxRequestsPerTime);
//Submit some RequestSubmitters to the pool...
final ExecutorService pool = Executors.newFixedThreadPool(100);
for (int i = 0; i < 50_000; ++i)
pool.submit(new RequestSubmitter(q));
System.out.println("Serving...");
//Find out the period between communications with the external API:
final long delayMicroseconds = TimeUnit.MICROSECONDS.convert(timeAmount, timeUnit) / maxRequestsPerTime;
//We could do the same with NANOSECONDS for more accuracy, but that would be overkill I think.
//The most important line probably:
Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(new RequestImplementor(q), 0L, delayMicroseconds, TimeUnit.MICROSECONDS);
}
}
请注意,我使用了
scheduleWithFixedDelay
而不是
scheduleAtFixedRate
.您可以在他们的文档中看到第一个将等待提交
Runnable
的调用结束之间的延迟。开始下一个,而第二个不会等待,只需重新提交
Runnable
每
period
时间单位。但是我们不知道与外部 API 通信需要多长时间,那么如果我们
scheduleAtFixedRate
呢?与
period
每分钟一次,但请求需要一分钟以上才能完成?...然后将在第一个尚未完成时提交新请求。所以这就是我使用
scheduleWithFixedDelay
的原因而不是
scheduleAtFixedRate
.但还有更多:我使用了单线程调度执行器服务。是不是说第一次调用没有结束,第二次就不能开始了?... 看来是这样,你看一下
Executors#newSingleThreadScheduledExecutor()
的实现,因为单线程核心池的大小可能会发生第二次调用,并不意味着池的大小是固定的。
scheduleWithFixedDelay
的另一个原因是因为请求下溢。例如队列为空怎么办?然后调度也应该等待而不是提交
Runnable
再次。
scheduleWithFixedDelay
,说延迟
1/60f
调度之间的秒数,一分钟内提交了60多个请求,那么这肯定会让我们对外部API的吞吐量下降,因为有了
scheduleWithFixedDelay
我们可以保证最多向外部 API 发出 60 个请求。它可以比这少,但我们不希望它如此。我们希望每次都达到极限。如果这不是您关心的问题,那么您已经可以使用上述实现了。
BlockingQueue
,我找不到会发生什么。我建议的实现不是
put
按 FIFO 顺序排列。我的意思是,如果队列已满时 2 个请求几乎同时到达怎么办?他们都会等待,但第一个到达的人会等待并得到
put
第一个,或者第二个是
put
先编?我不知道。如果您不关心在外部 API 上发出的某些请求无序,那么请不要担心并使用代码到此为止。但是,如果您确实关心,并且您可以在每个请求中输入序列号,那么您可以使用
PriorityQueue
后
BlockingQueue
,甚至可以尝试
PriorityBlockingQueue
(不幸的是,这是无限的)。那会使事情变得更加复杂,所以我没有用
PriorityQueue
发布相关代码。 .至少我尽力了,我希望我能提出一些好的想法。我并不是说这篇文章是您所有问题的完整解决方案,但它是一些考虑因素。
关于java - 用于管理 api 每分钟最大请求数的数据结构,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65258130/
我正在编写一个具有以下签名的 Java 方法。 void Logger(Method method, Object[] args); 如果一个方法(例如 ABC() )调用此方法 Logger,它应该
我是 Java 新手。 我的问题是我的 Java 程序找不到我试图用作的图像文件一个 JButton。 (目前这段代码什么也没做,因为我只是得到了想要的外观第一的)。这是我的主课 代码: packag
好的,今天我在接受采访,我已经编写 Java 代码多年了。采访中说“Java 垃圾收集是一个棘手的问题,我有几个 friend 一直在努力弄清楚。你在这方面做得怎么样?”。她是想骗我吗?还是我的一生都
我的 friend 给了我一个谜语让我解开。它是这样的: There are 100 people. Each one of them, in his turn, does the following
如果我将使用 Java 5 代码的应用程序编译成字节码,生成的 .class 文件是否能够在 Java 1.4 下运行? 如果后者可以工作并且我正在尝试在我的 Java 1.4 应用程序中使用 Jav
有关于why Java doesn't support unsigned types的问题以及一些关于处理无符号类型的问题。我做了一些搜索,似乎 Scala 也不支持无符号数据类型。限制是Java和S
我只是想知道在一个 java 版本中生成的字节码是否可以在其他 java 版本上运行 最佳答案 通常,字节码无需修改即可在 较新 版本的 Java 上运行。它不会在旧版本上运行,除非您使用特殊参数 (
我有一个关于在命令提示符下执行 java 程序的基本问题。 在某些机器上我们需要指定 -cp 。 (类路径)同时执行java程序 (test为java文件名与.class文件存在于同一目录下) jav
我已经阅读 StackOverflow 有一段时间了,现在我才鼓起勇气提出问题。我今年 20 岁,目前在我的家乡(罗马尼亚克卢日-纳波卡)就读 IT 大学。足以介绍:D。 基本上,我有一家提供簿记应用
我有 public JSONObject parseXML(String xml) { JSONObject jsonObject = XML.toJSONObject(xml); r
我已经在 Java 中实现了带有动态类型的简单解释语言。不幸的是我遇到了以下问题。测试时如下代码: def main() { def ks = Map[[1, 2]].keySet()
一直提示输入 1 到 10 的数字 - 结果应将 st、rd、th 和 nd 添加到数字中。编写一个程序,提示用户输入 1 到 10 之间的任意整数,然后以序数形式显示该整数并附加后缀。 public
我有这个 DownloadFile.java 并按预期下载该文件: import java.io.*; import java.net.URL; public class DownloadFile {
我想在 GUI 上添加延迟。我放置了 2 个 for 循环,然后重新绘制了一个标签,但这 2 个 for 循环一个接一个地执行,并且标签被重新绘制到最后一个。 我能做什么? for(int i=0;
我正在对对象 Student 的列表项进行一些测试,但是我更喜欢在 java 类对象中创建硬编码列表,然后从那里提取数据,而不是连接到数据库并在结果集中选择记录。然而,自从我这样做以来已经很长时间了,
我知道对象创建分为三个部分: 声明 实例化 初始化 classA{} classB extends classA{} classA obj = new classB(1,1); 实例化 它必须使用
我有兴趣使用 GPRS 构建车辆跟踪系统。但是,我有一些问题要问以前做过此操作的人: GPRS 是最好的技术吗?人们意识到任何问题吗? 我计划使用 Java/Java EE - 有更好的技术吗? 如果
我可以通过递归方法反转数组,例如:数组={1,2,3,4,5} 数组结果={5,4,3,2,1}但我的结果是相同的数组,我不知道为什么,请帮助我。 public class Recursion { p
有这样的标准方式吗? 包括 Java源代码-测试代码- Ant 或 Maven联合单元持续集成(可能是巡航控制)ClearCase 版本控制工具部署到应用服务器 最后我希望有一个自动构建和集成环境。
我什至不知道这是否可能,我非常怀疑它是否可能,但如果可以,您能告诉我怎么做吗?我只是想知道如何从打印机打印一些文本。 有什么想法吗? 最佳答案 这里有更简单的事情。 import javax.swin
我是一名优秀的程序员,十分优秀!