- VisualStudio2022插件的安装及使用-编程手把手系列文章
- pprof-在现网场景怎么用
- C#实现的下拉多选框,下拉多选树,多级节点
- 【学习笔记】基础数据结构:猫树
acks
参数设置为"all"Kafka消息丢失的原因通常涉及多个方面,包括生产者、消费者和Kafka服务端(Broker)的配置和行为。下面将围绕这三个关键点,详细探讨Kafka消息丢失的常见原因,并提供相应的解决方案和最佳实践。具体分析如下:
消息大小超过Broker的message.max.bytes的值。此时Broker会直接返回错误.
可以通过压缩消息体、去除不必要的字段等方式减小消息大小.
max.request.size,表示生产者发送的单个消息的最大值,也可以指单个请求中所有消息的总和大小。默认值为1048576B,1MB。这个参数的值值必须小于Broker的message.max.bytes.
Kafka生产者默认采用异步发送消息,如果未正确处理发送结果,可能导致消息丢失.
不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。带有回调通知的 send 方法可以针对发送失败的消息进行重试处理.
生产者在发送消息时可能遇到网络抖动或完全中断,导致消息未能到达Broker。如果生产者的配置没有考虑这种情况,例如未设置恰当的重试机制(retries参数)和确认机制(acks参数),消息就可能在网络不稳定时丢失.
acks
参数设置为"all"acks参数指定了必须要有多少个分区副本收到消息,生产者才认为该消息是写入成功的,这个参数对于消息是否丢失起着重要作用,该参数的配置具体如下:
使用同步发送方式或确保acks参数设置为"all",以确保所有副本接收到消息.
重试参数主要有retries和retry.backoff.ms两个参数.
(1)参数 retries是指生产者重试次数,该参数默认值为0.
消息在从生产者从发出到成功写入broker之前可能发生一些临时性异常,比如网络抖动、leader副本选举等,这些异常发生时客户端会进行重试,而重试的次数由retries参数指定。如果重试达到设定次数,生产者才会放弃重试并抛出异常。但是并不是所有的异常都可以通过重试来解决,比如消息过大,超过max.request.size参数配置的数值(默认值为1048576B,1MB)。如果设置retries大于0而没有设置参数max.in.flight.requests.per.connection(限制每个连接,也就是客户端与Node之间的连接最多缓存请求数)大于0则意味着放弃发送消息的顺序性.
使用retries的默认值交给使用方自己去控制,结果往往是不处理。所以通用设置建议设置如下:
retries = Integer.MAX_VALUE
max.in.flight.requests.per.connection = 1
该参数的设置已经在kafka 2.4版本中默认设置为Integer.MAX_VALUE;同时增加了delivery.timeout.ms的参数设置.
(2)参数retry.backoff.ms 用于设定两次重试之间的时间间隔,默认值为100.
避免无效的频繁重试。在配置retries和retry.backoff.ms之前,最好先估算一下可能的异常恢复时间,这样可以设定总的重试时间要大于异常恢复时间,避免生产者过早的放弃重试.
参数min.insync.replicas, 该参数控制的是消息至少被写入到多少个副本才算是 “真正写入”,该值默认值为 1,不建议使用默认值 1, 建议设置min.insync.replicas至少为2。 因为如果同步副本的数量低于该配置值,则生产者会收到错误响应,从而确保消息不丢失.
为了提升性能,Kafka 使用 Page Cache,先将消息写入 Page Cache,采用了异步刷盘机制去把消息保存到磁盘。如果刷盘之前,Broker Leader 节点宕机了,并且没有 Follower 节点可以切换成 Leader,则 Leader 重启后这部分未刷盘的消息就会丢失.
如果Broker的副本因子(replication.factor)设置过低,或者同步副本的数量(min.insync.replicas)设置不当,一旦Leader Broker宕机,选举出的新的Leader可能不包含全部消息,导致消息丢失.
这种场景下多设置副本数是一个好的选择,通常的做法是设置 replication.factor >= 3,这样每个 Partition 就会有 3个以上 Broker 副本来保存消息,同时宕机的概率很低.
同时配合设置上文提到的参数 min.insync.replicas至少为2(不建议使用默认值 1),表示消息至少要被成功写入到 2 个 Broker 副本才算是发送成功.
假如 leader 副本所在的 broker 突然挂掉,那么就要从 follower 副本重新选出一个 leader ,但 leader 的数据还有一些没有被 follower 副本同步的话,就会造成消息丢失.
参数unclean.leader.election.enable 的值说明如下:
该参数默认值为false。但如果为true的话,意味着非ISR集合中的副本也可以参加选举成为leader,由于不同步副本的消息较为滞后,此时成为leader的话可能出现消息不一致的情况。所以unclean.leader.election.enable 这个参数值要设置为 false.
同上文.
为了提高性能,减少刷盘次数, Kafka的Broker数据持久化时,会先存储到页缓存(Page cache)中, 。
按照一定的消息量和时间间隔进行进行批量刷盘的做法。数据在page cache时,如果系统挂掉,消息未能及时写入磁盘,数据就会丢失。Kafka没有提供同步刷盘的方式,所以只能通过增加副本或者修改刷盘参数提高刷盘频率来来减少这一情况.
kafka提供设置刷盘机制的参数如下:
log.flush.interval.messages 多少条消息刷盘1次,默认Long.MaxValue 。
log.flush.interval.ms 隔多长时间刷盘1次 默认null 。
log.flush.scheduler.interval.ms 周期性的刷盘。默认Long.MaxValue 。
官方不建议通过上述的刷盘3个参数来强制写盘。其认为数据的可靠性通过replica来保证,而强制flush数据到磁盘会对整体性能产生影响.
同上文.
参数 enable.auto.commit 于设定是否自动提交offset,默认是true。代表消息会自动提交偏移量。但是提交偏移量后,消息处理失败了,则该消息丢失.
可以把 enable.auto.commit 设置为 false,这样相当于每次消费完后手动更新 Offset。不过这又会带来提交偏移量失败时,该消息复消费问题,因此消费端需要做好幂等处理.
如果消费端采用多线程并发消费,很容易因为并发更新 Offset 导致消费失败.
如果对消息丢失很敏感,最好使用单线程来进行消费。如果需要采用多线程,可以把 enable.auto.commit 设置为 false,这样相当于每次消费完后手动更新 Offset.
消费者如果处理消息的速度跟不上消息产生的速度,可能会导致消息堆积,进而触发消费者客户端的流控机制,从而遗失部分消息.
一般问题都出在消费端,尽量提高客户端的消费速度,消费逻辑另起线程进行处理.
消费者组 rebalance导致导致消息丢失的场景有两种: 1、某个客户端心跳超时,触发 Rebalance被踢出消费组。如果只有这一个客户端,那消息就不会被消费了。 2、Rebalance时没有及时提交偏移量,因为 Rebalance重新分配分区给消费者,所以如果在 Rebalance 过程中,消费者没有及时提交偏移量,可能会导致消息丢失.
提高单条消息的处理速度,例如对消息处理中比 较耗时的步骤可通过异步的方式进行处理、利用多线程处理等.
某些参数设置不当会导致重平衡频繁 ,严重影响消费速度,此时可以通过调整参数避免不必要的重平衡。 kafka rebalance所涉及的参数如下:
session.timeout.ms 该参数是 Coordinator 检测消费者失败的时间,即在这段时间内客户端是否跟 Coordinator 保持心跳,如果该参数设置数值小,可以更早发现消费者崩溃的信息,从而更快地开启重平衡,避免消费滞后,但是这也会导致频繁重平衡,这要根据实际业务来衡量.
max.poll.interval.ms 于设定consumer两次poll的最大时间间隔(默认5分钟),如果超过了该间隔consumer client会主动向coordinator发起LeaveGroup请求,触发rebalance。根据实际场景可将max.poll.interval.ms值设置大一点,避免不 必要的rebalance.
heartbeat.interval.ms 该参数跟 session.timeout.ms 紧密关联,前面也说过,只要在 session.timeout.ms 时间内与 Coordinator 保持心跳,就不会被 Coordinator 剔除,那么心跳间隔的时间就是session.timeout.ms,因此,该参数值必须小于 session.timeout.ms,以保持 session.timeout.ms 时间内有心跳.
max.poll.records 于设定每次调用poll()时取到的records的最大数,默认值是500,可根 据实际消息速率适当调小。这种思路可解决因消费时间过长导致的重复消费问题, 对代码改动较小,但无法绝对避免重复消费问题.
即使把参数都设置的很完善也会丢失消息的两种场景 。
当把数据写到足够多的PageCache的时候就会告知生产者现在数据已经写入成功,但如果还没有把PageCache的数据写到硬盘上,这时候PageCache所在的操作系统都挂了,此时就会丢失数据.
副本所在的服务器硬盘都坏了,也会丢数据.
总的来说,Kafka消息丢失是一个涉及多个环节的问题,需要从生产者、Broker和消费者三个层面综合考虑。通过合理的配置和策略,结合监控和及时的应对措施,可以大幅降低消息丢失的风险,确保数据在分布式系统中的可靠传递.
下图是本文内容总结的脑图:
最后 。
最后此篇关于剖析Kafka消息丢失的原因的文章就讲到这里了,如果你想了解更多关于剖析Kafka消息丢失的原因的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我对cassandra并使用1.2.10非常陌生。我有一个时间戳数据类型的主键列。现在,我正在尝试检索日期范围的数据。由于我们知道不能在cassandra中使用,因此我使用的是大于()来获取日期范围。
我正在尝试进行有条件的转场。但我得到: Terminating app due to uncaught exception 'NSInvalidArgumentException', reas
我有一个游戏项目,在调试和发布模式下在设备上运行得非常好。我有两个版本。旧版本和新版本具有更多(后来我添加了)功能,并且两者的 bundle ID、版本相同。当我构建旧版本时,之前没有安装“myGam
这个问题已经有答案了: 奥 git _a (2 个回答) 已关闭 5 年前。 我正在获取 ClassCastException 。这两个类来自不同的 jar,但是JettyContinuationPr
以下代码行抛出异常: HttpResponse response = client.execute(request); // actual HTTP request 我能够捕获它并打印: Log
就目前情况而言,这个问题不太适合我们的问答形式。我们希望答案得到事实、引用资料或专业知识的支持,但这个问题可能会引发辩论、争论、民意调查或扩展讨论。如果您觉得这个问题可以改进并可能重新开放,visit
public class TwoThreads { private static Object resource = new Object(); private static void
当我输入 6 (int) 作为值时,运行此命令会出现段错误 (gcc filename.c -lm)。请帮助我解决这个问题。预期的功能尚未实现,但我需要知道为什么我已经陷入段错误。 谢谢! #incl
所以,过去一周半我一直在研究这个 .OBJ/.MTL 网格解析器。在这段时间里,我一直在追踪/修复很多错误、清理代码、记录代码等等。 问题是,每修复一个错误,仍然会出现这个问题,而且一张图片胜过一千个
我正在运行一个代码,它基本上围绕 3 个维度旋转一个大数据数组(5000 万行)。但是,我遇到了一个奇怪的问题,我已将其缩小到如何评估旋转矩阵。基本上,对于除绕 x 轴以外的任何旋转,python 代
就在你说这是重复之前,我已经看到了其他问题,但我仍然想发布这个。 所以我正在阅读 Thinking in Java -Bruce Eckel 这篇文章是关于小写命名约定的: In Java 1.0 a
我想在我的应用程序中使用 REST API。它为我从这个应用程序发出的所有请求抛出 SocketTimeoutException。 Logcat 输出:(您也可以在此处看到带有漂亮格式的输出:http
我知道 raise ... from None 并已阅读 How can I more easily suppress previous exceptions when I raise my own
在未能找到各种Unix工具(例如xargs和whatnot)的最新独立二进制文件(this version很好,但需要外部DLL)后,我承担了自己进行编译的挑战。 ...这是痛苦的。 最终,尽管如此,
我有一个用PHP编写的流套接字服务器。 为了查看一次可以处理多少个连接,我用C语言编写了一个模拟器来创建1000个不同的客户端以连接到服务器。 stream_socket_accept几次返回fals
我的Android Studio昨天运行良好,但是今天当我启动Android Studio并想在移动设备上运行应用程序时,发生了以下错误, 我在互联网和stackoverflow上进行了搜索,但没有解
默认情况下,grails似乎为Java域对象的toString()返回:。那当然不是我想要的,所以我尝试@Override toString()返回我想要的。当我尝试grails generate-a
尝试通过LDAP通过LDAP对用户进行身份验证时,出现以下错误。 Reason: Cannot pass null or empty values to constructor. 谁能告诉我做错了什么
我正在尝试使用应用程序附带的 Houdini Python 模块,该模块是 Houdini 安装文件夹的一部分,位于标准 Python 路径之外。按照安装说明操作后,运行 Houdini Termin
简单地说,我正在为基本数据库编写单链表的原始实现。当用户请求打印索引下列出的元素高于数据库中当前记录数量时,我不断出现段错误,但仅当差值为 1 时。对于更高的数字,它只会触发我在那里编写的错误系统。
我是一名优秀的程序员,十分优秀!