- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章C++ 实现线程安全的频率限制器(推荐)由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
很早以前,在学习使用 Python 的deque容器时,我写了一篇文章python3 deque 双向队列创建与使用方法分析。最近需要压测线上服务的性能,又不愿意总是在 QA 那边排队,于是需要自己写一个压测用的客户端。其中一个核心需求就是要实现 QPS 限制.
于是,终究逃不开要在 C++ 中实现一个线程安全的频率限制器.
设计思路 。
所谓频率限制,就是要在一个时间段(inteval)中,限制操作的次数(limit)。这又可以引出两种强弱不同的表述:
不难发现,强表述通过「滑动窗口」的方式,不仅限制了频率,还要求了操作在时间上的均匀性。前作的频率限制器,实际上对应了这里的强表述。但实际工程中,我们通常只需要实现弱表述的频率限制器就足够使用了.
对于弱表述,实现起来主要思路如下:
当操作计数(count)小于限制(limit)时直接放行; 。
单线程实现 。
在不考虑线程安全时,不难给出这样的实现.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
struct
ms_clock {
using
rep = std::chrono::milliseconds::rep;
using
period = std::chrono::milliseconds::period;
using
duration = std::chrono::duration<rep, period>;
using
time_point = std::chrono::time_point<ms_clock, duration>;
static
time_point now() noexcept {
return
time_point(std::chrono::duration_cast<duration>(
std::chrono::steady_clock::now().time_since_epoch()));
}
};
}
// namespace __details
class
RateLimiter {
public
:
using
clock
= __details::ms_clock;
// 1.
private
:
const
uint64_t limit_;
const
clock
::duration interval_;
const
clock
::rep interval_count_;
mutable
uint64_t count_{std::numeric_limits<uint64_t>::max()};
// 2.a.
mutable
clock
::rep timestamp_{0};
// 2.b.
public
:
constexpr RateLimiter(uint64_t limit,
clock
::duration interval) :
limit_(limit), interval_(interval), interval_count_(interval_.count()) {}
RateLimiter(
const
RateLimiter&) =
delete
;
// 3.a.
RateLimiter& operator=(
const
RateLimiter&) =
delete
;
// 3.b.
RateLimiter(RateLimiter&&) =
delete
;
// 3.c.
RateLimiter& operator=(RateLimiter&&) =
delete
;
// 3.d.
bool
operator()()
const
;
double
qps()
const
{
return
1000.0 *
this
->limit_ /
this
->interval_count_;
}
};
bool
RateLimiter::operator()()
const
{
auto orig_count =
this
->count_++;
if
(orig_count <
this
->limit_) {
// 4.
return
true
;
}
else
{
auto ts =
this
->timestamp_;
auto now =
clock
::now().time_since_epoch().count();
if
(now - ts <
this
->interval_count_) {
// 5.
return
false
;
}
this
->timestamp_ = now;
this
->count_ = 1;
return
true
;
}
}
|
这里, 。
(1) 表明频率限制器使用单位为毫秒的时钟。在实际使用时,也可以按需改成微妙甚至纳秒.
(2) 使用mutable修饰内部状态count_和timestamp_。这是因为两个limit_和interval_相同的频率限制器,在逻辑上是等价的,但他们的内部状态却不一定相同。因此,为了让const成员函数能够修改内部状态(而不改变逻辑等价),我们要给内部状态数据成员加上mutable修饰.
(2.a) 处将count_设置为类型可表示的最大值是为了让 。
(4) 的判断失败,而对timestamp_进行初始化.
(2.b) 处将timestamp_设置为0则是基于同样的原因,让 (5) 的判断失败.
(2.a) 和 (2.b) 处通过巧妙的初值设计,将初始化状态与后续正常工作状态的逻辑统一了起来,而无须丑陋的判断.
(3) 禁止了对象的拷贝和移动。这是因为一个频率限制器应绑定一组操作,而不应由两组或更多组操作共享(对于拷贝的情形),或是中途失效(对于移动的情形).
如此一来,函数调用运算符就忠实地实现了前述逻辑.
多线程改造 。
第一步改造 。
当有多线程同时调用RateLimiter::operator()时,显而易见,在count_和timestamp_上会产生竞争。我们有两种办法解决这个问题:要不然加锁,要不然把count_和timestamp_设为原子变量然后用原子操作解决问题。于是,对函数调用运算符,我们有如下第一步的改造.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
class
RateLimiter {
// 其余保持不变
private
:
mutable
std::atomic<uint64_t> count_{std::numeric_limits<uint64_t>::max()};
// 1.a.
mutable
std::atomic<
clock
::rep> timestamp_{0};
// 1.b.
// 其余保持不变
};
bool
RateLimiter::operator()()
const
{
auto orig_count =
this
->count_.fetch_add(1UL);
// 2.
if
(orig_count <
this
->limit_) {
return
true
;
}
else
{
auto ts =
this
->timestamp_.load();
// 3.
auto now =
clock
::now().time_since_epoch().count();
if
(now - ts <
this
->interval_count_) {
return
false
;
}
this
->timestamp_.store(now);
// 4.
this
->count_.store(1UL);
// 5.
return
true
;
}
}
|
这里, 。
count_
和timestamp_
声明为原子的,从而方便后续进行原子操作。这样看起来很完美,但其实是有 bug 的。我们重点关注 (4) 这里。(4) 的本意是更新timestamp_,以备下一次orig_count >= this->limit_时进行判断。准确设置这一timestamp是频率限制器正确工作的基石。但是,如果有两个(或更多)线程,同时走到了 (4)会发生什么?
timestamp_
的值究竟是什么,我们完全不可预期。count_
置为1
。但是你想,频率限制器先后放行了两次操作,但为什么count_
是1
呢?这是不是就「偷跑」了一次操作?为此,我们要保证只有一个线程会真正设置timestamp_,而拒绝其他同样走到 (4) 位置的线程的操作,以避免其重复设置timestamp_以及错误地将count_再次置为1.
第二步改进 。
于是有以下改进.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
bool
RateLimiter::operator()()
const
{
auto orig_count =
this
->count_.fetch_add(1UL);
// 3.
if
(orig_count <
this
->limit_) {
// 4.
return
true
;
}
else
{
auto ts =
this
->timestamp_.load();
auto now =
clock
::now().time_since_epoch().count();
if
(now - ts <
this
->interval_count_) {
// 5.
return
false
;
}
if
(not
this
->timestamp_.compare_and_exchange_strong(ts, now)) {
// 1.
return
false
;
}
this
->count_.store(1UL);
// 2.
return
true
;
}
}
|
这里,(1) 是一个 CAS 原子操作。它会原子地比较timestamp_和ts的值(Compare):若他们相等,则将now的值写入timestamp_(Swap),并返回true;若他们不相等,则将timestamp_的值写入ts,并返回false。如果没有其他线程抢先修改timestamp_的值,那么 CAS 操作应该成功并返回true,继续执行后面的代码;否则,说明其他线程已经抢先修改了timestamp_,当前线程的操作被记入上一个周期而被频率限制器拒绝.
现在要考虑 (2)。如果执行完 (1) 之后立即立刻马上没有任何延迟地执行 (2),那么当然一切大吉。但如果这时候当前线程被切出去,会发生什么?我们要分情况讨论.
如果ts == 0,也就是「当前线程」是频率限制器第一次修改timestamp_。于是,当前线程可能会在 (3) 处将count_(溢出地)自增为0,从而可能有其他线程通过 (4)。此时,当前线程在当前分片有可能应当被拒绝操作。为此,我们需要在 (1) 和 (2) 之间做额外的判断.
1
2
3
4
|
if
(ts == 0) {
auto orig_count =
this
->count.fetch_add(1UL);
return
(orig_count <
this
->limit_);
}
|
如果ts != 0,也就是「当前线程」并非频率限制器第一次修改timestamp_。那么其他线程在 (4) 处必然判断失败,但在 (5) 处的判断可能成功,从而可能继续成功执行 (1),从而接连两次执行 (2)。但这不影响正确性。因为通过 (5) 表明相对当前线程填入的timestamp_,已经由过了一个时间段(interval),而在这个时间段里,只有当前线程的一次操作会被接受.
第三次改进 。
由此,我们得到:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
bool
RateLimiter::operator()()
const
{
auto orig_count =
this
->count_.fetch_add(1UL);
if
(orig_count <
this
->limit_) {
return
true
;
}
else
{
auto ts =
this
->timestamp_.load();
auto now =
clock
::now().time_since_epoch().count();
if
(now - ts <
this
->interval_count_) {
return
false
;
}
if
(not
this
->timestamp_.compare_and_exchange_strong(ts, now)) {
return
false
;
}
if
(ts == 0) {
auto orig_count =
this
->count.fetch_add(1UL);
return
(orig_count <
this
->limit_);
}
this
->count_.store(1UL);
return
true
;
}
}
|
至此,我们的代码在逻辑上已经成立了,接下来要做一些性能方面的优化.
原子操作默认采用std::memory_order_seq_cst的内存模型。这是 C++ 中最严格的内存模型,它有两个保证:
为了实现顺序一致性(sequential consistency),编译器会使用很多对抗编译器优化和 CPU 乱序执行(Out-of-Order Execution)的手段,因而性能较差。对于此处的count_,我们无需顺序一致性模型,只需要获取释放模型(Aquire-Release)模型就足够了.
第四次改进 。
于是有第四次改进:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
bool
RateLimiter::operator()()
const
{
auto orig_count =
this
->count_.fetch_add(1UL, std::memory_order_acq_rel);
if
(orig_count <
this
->limit_) {
return
true
;
}
else
{
auto ts =
this
->timestamp_.load();
auto now =
clock
::now().time_since_epoch().count();
if
(now - ts <
this
->interval_count_) {
return
false
;
}
if
(not
this
->timestamp_.compare_and_exchange_strong(ts, now)) {
return
false
;
}
if
(ts == 0) {
auto orig_count =
this
->count.fetch_add(1UL, std::memory_order_acq_rel);
return
(orig_count <
this
->limit_);
}
this
->count_.store(1UL, std::memory_order_release);
return
true
;
}
}
|
至此,我们就完整实现了一个频率限制器,可以愉快地开始拉 QPS 压测了! 。
总结 。
到此这篇关于在 C++ 中实现一个线程安全的频率限制器的文章就介绍到这了,更多相关在 C++ 中实现一个线程安全的频率限制器内容请搜索我以前的文章或继续浏览下面的相关文章希望大家以后多多支持我! 。
原文链接:https://liam.page/2020/05/17/Implementing-a-Thread-safe-Rate-Limiter-in-C/ 。
最后此篇关于C++ 实现线程安全的频率限制器(推荐)的文章就讲到这里了,如果你想了解更多关于C++ 实现线程安全的频率限制器(推荐)的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我有一个包含 34 个变量和大约 25,000 个观测值的数据集。每个观察都涉及一个特定事件。它的格式如下: no id date .... 1 363 006
我已将 R 连接到 Twitter 并使用 R 中的 searchTwitter 函数进行抓取,并清除标点符号、小写字母等结果数据。现在我正在尝试执行以下操作: 计算自 2015 年 1 月 至 20
我正在研究项目,需要可视化频谱分析以设置一些精确参数。现在,我将垃圾箱转换为屏幕空间,因为在线性空间中,较低频率的幅度被压缩在一起。这是我在C++中的代码: float windowSize = 64
我正在尝试使用MATLAB导入WAV文件并创建如下所示的图表类型。我基本上是在尝试获取频率信息并根据分贝对其进行绘制。这是我正在使用的代码,但似乎无法正确提取频率信息: [x fs]=wavread(
我有一个 GUI,可以计算字符串中第一个字母的出现次数。我希望它以列格式计算所有字母,例如: 这是我到目前为止所拥有的: import java.awt.BorderLayout; import ja
我有一个由许多变量组成的全国调查,就像这个(为了简单起见,我省略了一些变量): year id y.b sex income married pens weight 2002
我被要求报告我们客户的联系频率,即每周、每月、每季度或每年看到多少客户。 当在论坛中讨论“频率”时,它们通常是指某个值在表中存在的次数。 我可以获得客户的联系人数量:- select A.cl
我正在尝试制作一款游戏,当麦克风发出足够响亮的声音时,我的角色会射击(在 Unity 中)。但是我不知道如何开始。 感谢您的帮助! 最佳答案 您可以通过使用 AudioSource.GetOutput
尝试计算字符数并改进我的代码,我做了一些更改,而不是使用 while 循环。好奇是否有人对我如何改进我的代码以使其更专业且更便宜有任何建议? #include int countingCharact
我正在创建一个 MySQL 数据库,其中包含大量带有时间戳的条目。这些条目将附加到特定用户和另一个索引(例如博客作者和他的几个网站)。计算用户/全局每日条目图表的最佳方法是什么。 我的两种方法是使用
我创建了一项调查并将其发送出去。该调查要求用户提供电子邮件,然后要求他们从包含 8 个不同选项的下拉菜单中选择要吃哪顿饭。有些人使用同一封电子邮件多次填写调查,但食物选择不同。 我有一个如下所示的 M
我有一个 MySQL 数据库: Date Customer_ID 我怎样才能把它变成: Customer_ID | Count_Visits_Past_Week | Count_Visits_
对于非常大的数据集,如何使用 gnuplot 仅在第一个和最后一个数据点的 x 轴上放置标记/标签? 最佳答案 在 gnuplot 4.6 及更高版本中,您可以使用命令 stats 'data.dat
我正在寻找一种方法来为具有共同词根/含义的单个词生成数值概率值。 用户将使用“舞者”、“跳舞”、“跳舞”等词生成内容。 如果“dancer”被提交了 30 次,跳舞了 5 次,我只需要一个值“danc
给定一个包含如下内容的数据集: [2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 65, 75, 85, 86, 87,
我想将声音的音高绘制成图表。 目前我可以绘制幅度。下图是由 getUnscaledAmplitude() 返回的数据创建的: AudioInputStream audioInputStream = A
在 Javascript 中,我试图获取一个初始的数值数组并计算其中的元素。理想情况下,结果将是两个新数组,第一个指定每个唯一元素,第二个包含每个元素出现的次数。不过,我愿意接受有关输出格式的建议。
我正在编写一个多线程OpenMPI应用程序,使用来自多个线程的MPI_Isend和MPI_Irecv在InfiniBand RDMA的各个列之间每秒交换数百条消息。 传输量约为400-800KByte
这个站点上有很多问题,询问如何在给定频率下创建简单的正弦波。我想做的是获取阵列或列表或任何频率,然后连续连续播放它们(而不是和弦),听起来有点像旧PC扬声器。我尝试使用Console.Beep,但是它
我使用我的App捕获声音。假设此声音是正弦1 KHz声音,并且存在背景声音。如何识别此1 KHz声音出现在声音上? 我的意思是,我可以想象如何在图像中找到元素,例如,如果您要在图像上寻找黄色正方形,那
我是一名优秀的程序员,十分优秀!