gpt4 book ai didi

java - 如何使用Spring Kafka生产者发送批量数据

转载 作者:行者123 更新时间:2023-11-30 01:44:52 28 4
gpt4 key购买 nike

目前我有这样的代码:

KafkaTemplate<String, String> kafkaTemplate;

List<Pet> myData;

for(Pet p: myData) {
String json = objectWriter.writeValueAsString(p)
kafkaTemplate.send(topic, json)
}

所以每个列表项都是一一发送的。如何一次发送整个列表?

最佳答案

所以没有直接的方法可以直接使用KafkaTemplate向kafka发送批量消息或KafkaProducer 。他们没有任何接受 List 的方法对象并将它们单独发送到不同的分区。

kafka生产者如何向kafka发送消息?

KafkaProducer

Kafka 生产者创建一批记录,然后立即发送所有记录,了解更多 information

The producer consists of a pool of buffer space that holds records that haven't yet been transmitted to the server as well as a background I/O thread that is responsible for turning these records into requests and transmitting them to the cluster.

The send() method is asynchronous. When called it adds the record to a buffer of pending record sends and immediately returns. This allows the producer to batch together individual records for efficiency.

Asynchronous send

Batching is one of the big drivers of efficiency, and to enable batching the Kafka producer will attempt to accumulate data in memory and to send out larger batches in a single request. The batching can be configured to accumulate no more than a fixed number of messages and to wait no longer than some fixed latency bound (say 64k or 10 ms). This allows the accumulation of more bytes to send, and few larger I/O operations on the servers. This buffering is configurable and gives a mechanism to trade off a small amount of additional latency for better throughput.

由于您正在使用spring-kafka您可以发送List<Objects>但你在这里发送JSONArrayJSONObject而不是每个 JSONObject到主题分区

public KafkaTemplate<String, List<Object>> createTemplate() {

Map<String, Object> senderProps = senderProps();
ProducerFactory<Integer, String> pf =
new DefaultKafkaProducerFactory<String, List<Object>>(senderProps);
KafkaTemplate<String, List<Object>> template = new KafkaTemplate<>(pf);
return template;

}

public Map<String, Object> producerProps() {

Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;

}

KafkaTemplate<String, List<Object>> kafkaTemplate;

关于java - 如何使用Spring Kafka生产者发送批量数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58492689/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com