- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
kafka生产者作为消息发送中很重要的一环,这里面可是大有文章,你知道生产者消息发送的流程吗?知道消息是如何发往哪个分区的吗?如何保证生产者消息的可靠性吗?如何保证消息发送的顺序吗?如果对于这些问题还比较模糊的话,那么很有必要看看这篇文章了,本文主要是基于kafka3.x版本讲解.
kafka生产者最重要的就是消息发送的整个流程,我们来看下究竟是怎么一回事把.
在消息发送的过程中,涉及到了两个线程—— main 线程和 Sender 线程。在 main 线程中创建了一个双端队列 RecordAccumulator 。 main 线程将消息发送给 RecordAccumulator , Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker .
kafkaProducer
创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器( RecordAccumulator
, 也称为消息收集器)中。 Sender
线程负责从 RecordAccumulator
获取消息并将其发送到 Kafka
中。 RecordAccumulator
主要用来缓存消息以便 Sender
线程可以批量发送,进而减少网络传输的资源消耗以提升性能。 RecordAccumulator
缓存的大小可以通过生产者客户端参数 buffer.memory
配置,默认值为 33554432B
,即 32M
。 RecordAccumulator
的某个双端队列( Deque
)中, RecordAccumulator
内部为每个分区都维护了一个双端队列,即 Deque<ProducerBatch>
, 消息写入缓存时,追加到双端队列的尾部。 Sender
读取消息时,从双端队列的头部读取。 ProducerBatch
是指一个消息批次;与此同时,会将较小的 ProducerBatch
凑成一个较大 ProducerBatch
,也可以减少网络请求的次数以提升整体的吞吐量。 ProducerBatch
大小可以通过 batch.size
控制,默认 16kb
。 Sender
线程会在有数据积累到 batch.size
,默认16kb,或者如果数据迟迟未达到 batch.size
, Sender
线程等待 linger.ms
设置的时间到了之后就会获取数据。 linger.ms
单位 ms
,默认值是 0ms
,表示没有延迟。 Sender
从 RecordAccumulator
获取缓存的消息之后,会将数据封装成网络请求 <Node,Request>
的形式,这样就可以将 Request
请求发往各个 Node
了。 sender
线程发往 Kafka
之前还会保存到 InFlightRequests
中,它的主要作用是缓存了已经发出去但还没有收到服务端响应的请求。 InFlightRequests
默认每个分区下最多缓存5个请求,可以通过配置参数为 max.in.flight.request.per. connection
修改。 Request
通过通道 Selector
发送到 kafka
节点。 acks
. Leader
收到数据后应答。 Request
请求接受到kafka的响应结果,如果成功的话,从 InFlightRequests
清除请求,否则的话需要进行重发操作,可以通过配置项 retries
决定,当消息发送出现错误的时候,系统会重发消息。 retries
表示重试次数。默认是 int 最大值, 2147483647
。 RecordAccumulator
中的数据。 现在我们来看看kafka生产者中常用且关键的配置参数.
bootstrap.servers
生产者连接集群所需的 broker 地 址 清 单 。 例 如 hadoop102:9092,hadoop103:9092,hadoop104:9092 ,可以设置 1 个或者多个,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者从给定的 broker 里查找到其他 broker 信息.
key.serializer
和 value.serializer
指定发送消息的 key 和 value 的序列化类型。一定要写全类名.
buffer.memory
RecordAccumulator 缓冲区总大小,默认 32m.
batch.size
缓冲区一批数据最大值,默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加.
linger.ms
如果数据迟迟未达到 batch.size ,kafka等待这个时间之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms 之间.
max.request.size
这个参数用来限制生产者客户端能发送的消息的最大值,默认值为 1048576B ,即 lMB 一般情况下,这个默认值就可以满足大多数的应用场景了.
compression.type
这个参数用来指定消息的压缩方式,默认值为“ none ",即默认情况下,消息不会被压缩。该参数还可以配置为 " gzip "," snappy " 和 " lz4 "。对消息进行压缩可以极大地减少网络传输、降低网络 I/O,从而提高整体的性能 。消息压缩是一种以时间换空间的优化方式,如果对时延有一定的要求,则不推荐对消息进行压缩; 。
acks
acks 的值为0,1和-1或者all.
Producer
往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低但是效率最高。 Producer
往集群发送数据只要 Leader
成功写入消息就可以发送下一条,只确保 Leader
接收成功。 Producer
往集群发送数据需要所有的 ISR Follower
都完成从 Leader
的同步才会发送下一条,确保Leader 发送
成功和所有的副本都成功接收。安全性最高,但是效率最低。 max.in.flight.requests.per.connection
允许最多没有返回 ack 的次数,默认为 5,开启幂等性要保证该值是 1-5 的数字.
retries
和 retry.backoff.ms
当消息发送出现错误的时候,系统会重发消息。 retries 表示重试次数。在kafka3.4.0默认是 int 最大值, 2147483647 。如果设置了重试,还想保证消息的有序性,需要设置 max.in.flight.requests.per.connection =1否则在重试此失败消息的时候,其他的消息可能发送成功了。另外 retry.backoff.ms 控制两次重试之间的时间间隔,默认是 100ms.
更多kafka生产者的配置可以查阅官网 https://kafka.apache.org/documentation/#producerconfigs .
通常情况下,生产者发送消息分为以下4个步骤
(1)配置生产者客户端参数及创建相应的生产者实例 。
(2)构建待发送的消息 。
(3)发送消息 。
(4)关闭生产者实例 。
我们直接上代码.
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.3.0</version>
</dependency>
public static void main(String[] args) {
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// key,value 序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// 3. 创建 kafka 生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
// 4. 调用 send 方法,发送消息
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new
ProducerRecord<>("first", Integer.toString(i), "hello " + i));
}
// 5. 关闭资源
kafkaProducer.close();
}
ProducerRecord
kafka发送时主要构造出 ProducerRecord 对象,包含发送的主题,partition,key,value等.
public class ProducerRecord<K, V> {
private final String topic;
private final Integer partition;
private final Headers headers;
private final K key;
private final V value;
private final Long timestamp;
}
kafka提供了3种发送消息的模式,发后即忘,同步发送和异步发送,我们直接上代码.
fire-and-forget
) 发后即忘,它只管往 Kafka 发送,并不关心消息是否正确到达。 在大多数情况下,这种发送方式没有问题。 不过在某些时候(比如发生不可重试异常时)会造成消息的丢失。 这种发送方式的性能最高,可靠性最差.
Future<RecordMetadata> send = producer.send(rcd);
sync
****) 只需在上面种发送方式的基础上,再调用一下 get()方法即可,该方法时阻塞的.
// 同步发送
kafkaProducer.send(new ProducerRecord<>("first","kafka" + i)).get();
async
****) 回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是 RecordMetadata 和 Exception ,如果 Exception 为 null ,说明消息发送成功,如果 Exception 不为 null ,说明消息发送失败.
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试.
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new
ProducerRecord<>("first", Integer.toString(i), "hello " + i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
// 没有异常,输出信息到控制台
System.out.println(" 主题: " +
metadata.topic() + "->" + "分区:" + metadata.partition());
} else {
// 出现异常打印
exception.printStackTrace();
}
}
});
}
kafka设计上存在分区的,它有下面两个好处:
Partition
在一个 Broker
上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台 Broker
上。合理控制分区的任务,可以实现负载均衡的效果。 那究竟生产者是按照什么样的策略发往到不同的分区呢?
根据生产者的发送流程,其中会经过分区器,默认情况下是使用 DefaultPartitioner ,具体逻辑如下:
kafka 发送消息的时候构造消息对象 ProducerRecord ,可以传入指定的 partition , 那么消息就会发送这个指定的分区。例如partition=0,所有数据写入分区0.
// 发送消息到0号分区
kafkaProducer.send(new
ProducerRecord<>("first", 0, Integer.toString(i), "hello " + i));
partition
值但有 key
的情况下,将 key
的 hash
值与 topic
的 partition
数进行取余得到 partition
值; 例如: key1 的 hash 值=5, key2 的 hash 值=6 , topic 的 partition 数=2,那么 key1 对应的 value1 写入1号分区, key2 对应的 value2 写入0号分区.
partition
值又没有 key
值的情况下,Kafka采用 Sticky Partition
(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的 batch
已满或者已完成, Kafka
再随机一个分区进行使用(和上一次的分区不同)。 例如:第一次随机选择0号分区,等0号分区当前批次满了(默认16k)或者 linger.ms 设置的时间到, Kafka 再随机一个分区进行使用(如果还是0会继续随机).
如果默认的分区规则不满足需求,我们也可以自定义一个分区器。比如我们实现一个分区器实现,发送过来的数据中如果包含 alvin ,就发往 0 号分区,不包含 alvin ,就发往 1 号分区.
Partitioner
/**
* 1. 实现接口 Partitioner
* 2. 实现 3 个方法:partition,close,configure
* 3. 编写 partition 方法,返回分区号
*/
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 获取消息
String msgValue = value.toString();
// 创建 partition
int partition;
// 判断消息是否包含 alvin
if (msgValue.contains("alvin")){
partition = 0;
}else {
partition = 1;
}
// 返回分区号
return partition;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
// 添加自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.alvin.kafka.producer.MyPartitioner");
// 发送消息 略~~
对比着前面kafka生产者的发送流程,kafka生产者提供的一些配置参数可以有助于提高生产者的吞吐量.
参数名称 | 描述 |
---|---|
buffer.memory |
RecordAccumulator 缓冲区总大小,默认 32m。适当增加该值,可以提高吞吐量。 |
batch.size |
缓冲区一批数据最大值,默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。 |
linger.ms |
如果数据迟迟未达到 batch.size , sender 线程等待 linger.time 之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms 之间。 |
compression.type |
指定消息的压缩方式,默认值为“ none ",即默认情况下,消息不会被压缩。该参数还可以配置为 " gzip "," snappy " 和 " lz4 "。对消息进行压缩可以极大地减少网络传输、降低网络 I/O,从而提高整体的性能 。 |
为了保证消息发送的可靠性, kafka 在 producer 里面提供了消息确认机制。我们可以通过配置来决 定消息发送到对应分区的几个副本才算消息发送成功。可以在定义 producer 时通过 acks 参数指定.
生产者发送过来的数据,不需要等数据落盘应答.
生产者发送过来的数据, Leader 收到数据后应答.
生产者发送过来的数据, Leader 和 ISR 队列里面的所有节点收齐数据后应答.
ISR 概念:(同步副本)。每个分区的 leader 会维护一个 ISR 列表, ISR 列表里面就是 follower 副本 的 Borker 编 号 , 只 有 跟 得 上 Leader 的 follower 副 本 才 能 加 入 到 ISR 里 面 , 这 个 是 通 过 replica.lag.time.max.ms =30000(默认值)参数配置的,只有 ISR 里的成员才有被选为 leader 的可能.
如果 Leader 收到数据,所有 Follower 都开始同步数据,但有一个 Follower ,因为某种故障,迟迟不能与 Leader 进行同步,那这个问题怎么解决呢?
Leader 维护了一个动态的 in-sync replica set ( ISR ),意为和 Leader 保持同步的 Follower+Leader 集合 (leader:0,isr:0,1,2) 。如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR 。该时间阈值由 replica.lag.time.max.ms 参数设定,默认 30s .
小结:数据完全可靠条件 = ACK 级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2.
acks=0
,生产者发送过来数据就不管了,可靠性差,效率高; acks=1
,生产者发送过来数据 Leader
应答,可靠性中等,效率中等; acks=-1或者all
,生产者发送过来数据 Leader
和 ISR
队列里面所有 Follwer
应答,可靠性高,效率低; 在生产环境中, acks=0 很少使用; acks=1 ,一般用于传输普通日志,允许丢个别数据; acks=-1 ,一般用于传输和钱相关的数据,对可靠性要求比较高的场景.
kafka作为分布式消息系统,难免会出现重复消息或者丢消息的情况,会存在3种数据传递语义.
ack级别设置为0, 可以保证数据不重复,但是不能保证数据不丢失, 所以叫做最多一次.
ack级别设置为-1 + 分区副本大于等于2 + ISR 里应答的最小副本数量大于等于2可能会出现至少一次的消息。比如下图中在发送过程Leader节点宕机,消息就会重试,就有可能出现消息的重复.
At Least Once 可以保证数据不丢失,但是不能保证数据不重复.
对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。这在kafka中可以通过 幂等性和事务 的特性实现.
精确一次(Exactly Once) = 幂等性 + 至少一次( ack=-1 + 分区副本数>=2 + ISR最小副本数量>=2) .
幂等性,简单来说,就是一个操作重复做,每次的结果都一样。开启幂等性功能,参数 enable.idempotence 设置为 true即可,在3.x版本中默认情况下也是true。具体实现原理如下:
producer
在初始化时会生成一个 producer_id
,并为每个目标 partition
维护一个“序列号”。 producer
每发送一条消息,会将< producer_id
,分区>对应的“序列号”加 1。 broker
服务端端会为每一对 <producer_id,分区>
维护一个序列号,对于每收到的一条消息,会判断服务端 的 SN_old
和接收到的消息中的 SN_new
进行对比: SN_OLD+1
= SN_NEW
,正常情况 SN_old+1
> SN_new
,说明是重复写入的数据,直接丢弃 SN_old+1
< SN_new
,说明中间有数据尚未写入,或者是发生了乱序,或者是数据丢失,将抛出严重异常: OutOfOrderSequenceException
。 根据前面的生产者发送流程可以知道,要想保证消息投递的顺序性:
max.in.flight.requests.per.connection=1
max.in.flight.requests.per.connection
需要设置为1。 max.in.flight.requests.per.connection
需要设置小于等于5。 因为在kafka1.x以后,启用幂等后,kafka服务端会缓存 producer 发来的最近5个 request 的元数据,故无论如何,都可以保证最近5个 request 的数据都是有序的.
本文总结了kafka生产者整个消息发送的流程,只有明白了这个流程以后,那么我们对于一些生产者消息发送的一些问题才有更加深刻的理解.
欢迎关注个人公众号【JAVA旭阳】交流学习 。
最后此篇关于kafka生产者你不得不知的那些事儿的文章就讲到这里了,如果你想了解更多关于kafka生产者你不得不知的那些事儿的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我在 Windows 机器上启动 Kafka-Server 时出现以下错误。我已经从以下链接下载了 Scala 2.11 - kafka_2.11-2.1.0.tgz:https://kafka.ap
关于Apache-Kafka messaging queue . 我已经从 Kafka 下载页面下载了 Apache Kafka。我已将其提取到 /opt/apache/installed/kafka
假设我有 Kafka 主题 cars。 我还有一个消费者组 cars-consumers 订阅了 cars 主题。 cars-consumers 消费者组当前位于偏移量 89。 当我现在删除 cars
我想知道什么最适合我:Kafka 流或 Kafka 消费者 api 或 Kafka 连接? 我想从主题中读取数据,然后进行一些处理并写入数据库。所以我编写了消费者,但我觉得我可以编写 Kafka 流应
我曾研究过一些 Kafka 流应用程序和 Kafka 消费者应用程序。最后,Kafka流不过是消费来自Kafka的实时事件的消费者。因此,我无法弄清楚何时使用 Kafka 流或为什么我们应该使用
Kafka Acknowledgement 和 Kafka 消费者 commitSync() 有什么区别 两者都用于手动偏移管理,并希望两者同步工作。 请协助 最佳答案 使用 spring-kafka
如何在 Kafka 代理上代理 Apache Kafka 生产者请求,并重定向到单独的 Kafka 集群? 在我的特定情况下,无法更新写入此集群的客户端。这意味着,执行以下操作是不可行的: 更新客户端
我需要在 Kafka 10 中命名我的消费者,就像我在 Kafka 8 中所做的一样,因为我有脚本可以嗅出并进一步使用这些信息。 显然,consumer.id 的默认命名已更改(并且现在还单独显示了
1.概述 我们会看到zk的数据中有一个节点/log_dir_event_notification/,这是一个序列号持久节点 这个节点在kafka中承担的作用是: 当某个Broker上的LogDir出现
我正在使用以下命令: bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test.topic --property
我很难理解 Java Spring Boot 中的一些 Kafka 概念。我想针对在服务器上运行的真实 Kafka 代理测试消费者,该服务器有一些生产者已将数据写入/已经将数据写入各种主题。我想与服务
我的场景是我使用了很多共享前缀的 Kafka 主题(例如 house.door, house.room ) 并使用 Kafka 流正则表达式主题模式 API 使用所有主题。 一切看起来都不错,我得到了
有没有办法以编程方式获取kafka集群的版本?例如,使用AdminClient应用程序接口(interface)。 我想在消费者/生产者应用程序中识别 kafka 集群的版本。 最佳答案 目前无法检索
每当我尝试重新启动 kafka 时,它都会出现以下错误。一旦我删除/tmp/kafka-logs 它就会得到解决,但它也会删除我的主题。 有办法解决吗? ERROR Error while
我是 Apache Kafka 的新用户,我仍在了解内部结构。 在我的用例中,我需要从 Kafka Producer 客户端动态增加主题的分区数。 我发现了其他类似的 questions关于增加分区大
正如 Kafka 文档所示,一种方法是通过 kafka.tools.MirrorMaker 来实现这一点。但是,我需要将一个主题(比如 测试 带 1 个分区)(其内容和元数据)从生产环境复制到没有连接
我已经在集群中配置了 3 个 kafka,我正在尝试与 spring-kafka 一起使用。 但是在我杀死 kafka 领导者之后,我无法将其他消息发送到队列中。 我将 spring.kafka.bo
我的 kafka sink 连接器从多个主题(配置了 10 个任务)读取,并处理来自所有主题的 300 条记录。根据每个记录中保存的信息,连接器可以执行某些操作。 以下是触发器记录中键值对的示例: "
我有以下 kafka 流代码 public class KafkaStreamHandler implements Processor{ private ProcessorConte
当 kafka-streams 应用程序正在运行并且 Kafka 突然关闭时,应用程序进入“等待”模式,发送警告日志的消费者和生产者线程无法连接,当 Kafka 回来时,一切都应该(理论上)去恢复正常
我是一名优秀的程序员,十分优秀!