- VisualStudio2022插件的安装及使用-编程手把手系列文章
- pprof-在现网场景怎么用
- C#实现的下拉多选框,下拉多选树,多级节点
- 【学习笔记】基础数据结构:猫树
本文围绕一个常见的使用场景深入分析在高吞吐场景下,使用Pulsar客户端收发消息可能会遇到的若干问题。并以此为切入点,梳理一下Pulsar客户端在内存控制上所做的优化改进.
假设这样一个常见的场景,一个搜索类业务需要记录用户搜索请求,以便后续分析搜索热点,以及有针对性的优化搜索效果等。于是,我们有下面这段逻辑,简化后如下:
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic("search-activities")
.create();
try {
MessageId messageId = producer.send(/* message payload here */);
log.debug("Search activity messageId={}", messageId);
} catch (Exception e) {
log.error("Failed to record search activity", e);
}
注意pulsarClient和producer均支持复用,并且推荐这么做,这里只是为了演示写到了一起.
producer.send是阻塞方式发送消息,换句话说就是线程会卡在这里等待发送结果返回。在现实中可以根据消息在实际业务中的需要选择阻塞和非阻塞两种方式,例如这里我们的业务是在用户发起一次搜索请求时记录搜索请求和上下文信息,业务上对搜索请求事件并无强依赖,因此这里使用阻塞方式发消息就不太适合了,从性能上考虑会加长整体的搜索延迟,从稳定性上考虑会增加搜索执行过程中的不确定性,总的来说,要区分支线流程和主线流程,不应该把支线流程全部嵌套在主线流程中.
于是,我们可以优化一下,调整为非阻塞方式,将记录搜索事件放到其它线程中完成:
producer.sendAsync(/* message payload here */).whenComplete((msgId, ex) -> {
if (ex != null) {
log.error("Failed to record search activity", ex);
} else {
log.debug("Search activity messageId={}", msgId);
}
});
在现实中,若用户搜索的TPS较高,例如在单实例上可以超过1000QPS(高和低都是相对而言的,这里只是举个例子)。若恰好记录的搜索事件内容较多(例如包含了搜索请求的完整上下文和搜索结果等),序列化之后大小能达到100KB甚至1MB,那么上面代码在运行时你可以会遇到MemoryBufferIsFullError异常:
org.apache.pulsar.client.api.PulsarClientException$MemoryBufferIsFullError: Client memory buffer is full
at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:972)
at org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java:452)
at org.apache.pulsar.client.impl.ProducerImpl.internalSendAsync(ProducerImpl.java:343)
at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.sendAsync(TypedMessageBuilderImpl.java:102)
at org.apache.pulsar.client.impl.ProducerBase.sendAsync(ProducerBase.java:68)
另外若服务本身与Pulsar的broker之间出现了网络波动,或者Pulsar服务内部组件之间出现网络波动,导致整体producer写入延迟升高,亦或是短时间出现大量写入,你还可能会遇到ProducerQueueIsFullError异常:
org.apache.pulsar.client.api.PulsarClientException$ProducerQueueIsFullError: Producer send queue is full
at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:965)
at org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java:452)
at org.apache.pulsar.client.impl.ProducerImpl.internalSendAsync(ProducerImpl.java:343)
at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.sendAsync(TypedMessageBuilderImpl.java:102)
at org.apache.pulsar.client.impl.ProducerBase.sendAsync(ProducerBase.java:68)
下面我们对上面两种异常产生的原因作一下分析,我们先来看一下构建Producer时,ProducerBuilder中与内存使用有关的配置项:
/*
* Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.
*/
ProducerBuilder<T> maxPendingMessages(int maxPendingMessages);
/*
* Set the number of max pending messages across all partitions.
*/
ProducerBuilder<T> maxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions);
maxPendingMessages用来控制producer内部队列中正在发送还没有接收到broker确认的消息数量,若队列大小超出了这个限制,默认的行为就是抛出ProducerQueueIsFullError异常,你可以通过修改另外一个配置blockIfQueueFull=true调整为阻塞等待队列中空出新的空间,这里还有另外需要注意的地方在下面会细说.
maxPendingMessages这个配置实际上是直接传递给底层各个分区的内部producer的,对于多分区的topic,实际处于pending状态的最大消息数量是maxPendingMessages乘以topic分区数量。由于maxPendingMessages结合可变的topic分区数量使得最终的pending消息数量变得不可控,因此还有另外一个优先级更高的配置maxPendingMessagesAcrossPartitions用来控制整个topic所有分区的总的一个pending消息数量,最终到各个分区内部producer取maxPendingMessages和maxPendingMessagesAcrossPartitions / partitions的较小值.
然而,在现实应用场景中,不同业务的消息大小差异很大,单纯基于消息数量控制内存使用,开发者需要预估平均消息大小,这几乎不可能做到,因为消息的实际大小很可能会随着业务的变化而发生变化,因此在PIP-74中,在构建PulsarClient时,ClientBuilder提供了一个面向整个client实例统一的内存限制配置:
/*
* Configure a limit on the amount of memory that will be allocated by this client instance.
*
* Setting this to 0 will disable the limit.
*/
ClientBuilder memoryLimit(long memoryLimit, SizeUnit unit);
当客户端所有producer中所有pending的消息大小总和超过这个限制时,默认则会抛出MemoryBufferIsFullError异常,若同时配置了blockIfQueueFull=true,则当前线程会阻塞等待前面pending的消息发送完成.
前面提到关于blockIfQueueFull配置的使用有一个细节需要注意,这个配置是为了限制客户端producer内存使用的同时,让开发者简化处理队列或者内存buffer满了的情况可以继续发送消息,例如在一个后台定时任务的场景中批量发送消息。然而这里需要强调的是blockIfQueueFull一旦配置为true,不论是应用发送消息调用的是阻塞的Producer.send方法还是非阻塞的Producer.sendAsync方法都会出现阻塞等待,”卡“住当前线程,那么对于我们上面的业务来说这是不可接受的,若由于支线流程(特殊情况容忍丢失的用户搜索事件)异常抖动,阻塞了主线流程(搜索主线程)就得不偿失了.
// 注意:若producer发送队列满或者内存buffer满,当前线程将卡在sendAsync方法调用
producer.sendAsync(/* message payload here */).whenComplete((msgId, ex) -> {
if (ex != null) {
log.error("Failed to record search activity", ex);
} else {
log.debug("Search activity messageId={}", msgId);
}
});
PIP-120对2.10.0以及之后版本的客户端中,默认启用了memoryLimit配置,其默认值为64MB,同时默认禁用了maxPendingMessages和maxPendingMessagesAcrossPartitions配置(默认值修改为0),另外将maxPendingMessagesAcrossPartitions配置标记为了Deprecated,因为使用这个配置最终目的就是控制客户端producer的内存使用,现在已经有memoryLimit这个更加直接的配置可以替代了.
上面说的全部都是围绕着Producer侧的内存使用来讲的,其实在PIP-74中也提到了Pulsar客户端consumer侧的内存使用,只不过在实现中是分阶段进行的.
我们先来看一下Pulsar客户端的API早期在构造一个Consumer时,ConsumerBuilder提供的与内存使用有关的选项:
/*
* Sets the size of the consumer receive queue.
*
* (default: 1000)
*/
ConsumerBuilder<T> receiverQueueSize(int receiverQueueSize);
/*
* Sets the max total receiver queue size across partitions.
*
* (default: 50000)
*/
ConsumerBuilder<T> maxTotalReceiverQueueSizeAcrossPartitions(int maxTotalReceiverQueueSizeAcrossPartitions);
Pulsar客户端通过预接收队列临时存放broker推送过来的消息,以便应用程序调用Consumer#receive或者Consumer#receiveAsync方法时直接从内存中返回消息,这是出于消费吞吐的考虑,本质上是一种以空间换取时间的策略。上面两个选项是给这个”空间“设置一个数量上的上限,注意这里仅是数量上的上限,实际的内存空间使用还要取决于平均消息大小。receiverQueueSize控制每个分区consumer的接收队列大小,maxTotalReceiverQueueSizeAcrossPartitions来控制所有分区consumer和parent consumer的接收队列总大小.
前面提到receiverQueueSize和maxTotalReceiverQueueSizeAcrossPartitions参数是以数量的形式间接的控制Consumer预接收队列的内存使用,在[PIP-74][pip-74]中提出了整个client级别的memoryLimit,同时提出了一个新的控制Consumer内存使用的方案,就是autoScaledReceiverQueueSizeEnabled
/*
* If this is enabled, the consumer receiver queue size is initialized as a very small value, 1 by default,
* and will double itself until it reaches either the value set by {@link #receiverQueueSize(int)} or the client
* memory limit set by {@link ClientBuilder#memoryLimit(long, SizeUnit)}.
*/
ConsumerBuilder<T> autoScaledReceiverQueueSizeEnabled(boolean enabled);
当启用了这个特性之后,receiverQueueSize会从1开始呈2的指数倍增长,直至达到receiverQueueSize的限制或达到client的memoryLimit限制,其目标是在有限制的内存使用下,达到最大的吞吐效率.
除了上面说的Producer和Consumer在生产和消费过程中的内存使用之外,还有一个容易被忽视的点是创建Consumer时ackTimeout和ackTimeoutTickTime的配置如果不匹配,会消耗较多堆内内存.
/*
* Sets the timeout for unacknowledged messages, truncated to the nearest millisecond. The timeout must be greater than 1 second.
*/
ConsumerBuilder<T> ackTimeout(long ackTimeout, TimeUnit timeUnit);
/**
* Define the granularity of the ack-timeout redelivery.
*
* <p>By default, the tick time is set to 1 second. Using a higher tick time
* reduces the memory overhead to track messages when the ack-timeout is set to
* bigger values (e.g., 1 hour).
*/
ConsumerBuilder<T> ackTimeoutTickTime(long tickTime, TimeUnit timeUnit);
若Consumer配置了ackTimeout并且配置了较大的时间窗口(例如1小时或者更长)时,应适当的调大ackTimeoutTickTime,这是因为Consumer内部使用了一个简单时间轮的算法对消息的处理时间计时,若ackTimeout时间窗口很大,ackTimeoutTickTime仍然使用其默认值1s,时间轮本身将会占用大量堆内存空间。具体细节可参考客户端源码UnAckedMessageTracker.java.
sendAsync
非阻塞方法要注意其不能保证消息一定发送成功,特别是开启了blockIfQueueFull
之后,它会在特定情况下演变成阻塞方法。本文最先发表于: https://aops.io/article/pulsar-client-memory-control.html 。
作者 萧易客 一线深耕消息中间件,RPC框架多年,欢迎评论区或通过邮件交流.
微信公众号: 萧易客 。
github id: shawyeok 。
最后此篇关于Pulsar客户端如何控制内存使用的文章就讲到这里了,如果你想了解更多关于Pulsar客户端如何控制内存使用的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
大家好,我完成了这个基本的 C 程序,它向输入任何给定数字集的用户显示有序集、最小值、最大值、平均值和中值。我遇到的问题是,当我打印数字时,我必须使用诸如“3.2%f”之类的东西来设置标准的精度,我怎
我有这个基于 Python 的服务守护进程,它正在执行大量多路复用 IO(选择)。 从另一个脚本(也是 Python)我想查询这个服务守护进程的状态/信息和/或控制处理(例如暂停它、关闭它、更改一些参
我读到 Fortran 对表达式求值的顺序有严格的规则。对于某些数值算法来说,这一点非常重要。 数值 C 程序如何控制浮点运算的顺序并防止编译器“优化”到不需要的运算顺序,例如将 (a*b)*c 更改
上下文: 整个问题可以概括为我正在尝试复制调用system(或fork)的行为,但在 mpi 环境中。 (事实证明,你不能并行调用system。)这意味着我有一个程序在许多节点上运行,每个节点上有一个
我考虑过控制scanf来接受c中的任何输入。我的概念是等待10秒(或任何其他时间)来接受任何输入。10秒后它将退出并且不再接收任何输入。 int main(){ int a,b,c,d; sca
我正在尝试使用生成器停止 setTimeOut 上的执行流程。我究竟做错了什么?我无法让 console.log 每 1500 毫秒退出一次。我是 node 的新手,如果我在做一件非常愚蠢的事情,请不
我希望我的应用程序的 Activity 堆栈包含同一 Activity 的多个实例,每个实例处理不同的数据。因此,我将让 Activity A 在我的 Activity 堆栈中处理数据 a、b、c 和
我有这个 bash 文件,它向设备询问 OpenSSH 的 IP、密码等。 现在,如果我使用 ssh root@ip,我必须输入密码。这真的很烦人。第二;我不能让我的脚本向它发送命令。 这就是我想要的
我正在尝试测试我有权访问的机器的缓存属性。为此,我正在尝试读取内存并对其计时。我改变工作集大小和步幅访问模式以获得不同的测量值。 代码如下所示: clock1 = get_ticks() for (i
我正在尝试编写一个 makefile 来替换用于构建相当大的应用程序的脚本之一。 当前脚本一次编译一个文件,使用 make 的主要原因是并行化构建过程。使用 make -j 16 我目前在办公室服务器
我正在制作一个小的测试程序,它演示了一个粗糙的控制台界面。 该程序是一个低于标准的典型获取行、响应程序,它甚至不识别“退出”,并希望您通过按 control-c 强制退出。在 Mingw32 上完成。
好的,我有一个 VOIP 电话。我知道电话的 IP 地址和端口,并且可以完全访问电话,我正在使用它通过 SIP 中继调用 SIP 电话。 我基本上想随时查看手机上发生的事情,但我不知道从哪里开始。 如
是否可以指定 CWinApp::WriteProfileString() 使用的应用程序名称? 如果我使用 CWinApp::SetRegistryKey 将我的公司名称设置为“MyCompany”,
我正在尝试用 Python 控制 Tor。我在 stackoverflow 上阅读了其他几个关于这个主题的问题,但没有一个能回答这个问题。 我正在寻找一种方法,以便在命令运行时为您提供“新身份”、新
最近在做一个项目,涉及到iPhone设备和手表传输数据、控制彼此界面跳转,在网上找了很多资料,发现国内的网站这方面介绍的不多,而国外的网站写的也不是很全,所以在这写这篇文章,给大家参考一下,望大神指
我想增加图中值的范围。在示例中,值的范围从 50 到 200。但是,我需要按如下方式分配值:50 75 100 125 150 175 200 并且最好使用 scale_fill_gradientn
我有一个IconButton,当按下时波纹效果是圆形的并且比按钮的面积大,我怎样才能减少点击按钮时波纹效果的大小? IconButton( constraints
我正在使用代码契约(Contract)为我的项目生成附属程序集。基本上它为项目的 MyAssembly.dll 创建一个 MyAssembly.Contracts.dll。这应该放在你的程序集旁边,但
我想使用分面绘制图形,其中面板之间的边缘不同。面板按字母顺序自动排序(按照 ggplot 中的惯例)。一个简单的例子: library(igraph) library(ggraph) g <- mak
我想为我的 Android 应用程序创建一个小部件,以显示有关位置的一些实时详细信息,例如天气。但我想在任何时候允许最多 3 个小部件实例,每个实例都有不同的位置。我不确定该怎么做,也找不到任何信息。
我是一名优秀的程序员,十分优秀!