gpt4 book ai didi

performance - Spring +卡夫卡: Transactions slow

转载 作者:行者123 更新时间:2023-12-03 09:01:38 27 4
gpt4 key购买 nike

刚开始使用Spring Kafka(2.1.4.RELEASE)和Kafka(1.0.0),但是当我添加事务时,处理速度降低了很多。

代码:

spring.kafka.consumer.max-poll-records=10
spring.kafka.consumer.specific.avro.reader=true
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=${application.name}
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.consumer.key-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer

在 Java 中我添加了:

@Bean
ProducerFactory<Object, Object> producerFactory(KafkaProperties properties) {
DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(properties.buildProducerProperties());
factory.setTransactionIdPrefix(properties.getProducer().getTransactionIdPrefix());
return factory;
}

@Bean
KafkaTemplate<Object, Object> kafkaTemplate(ProducerFactory<Object, Object> factory) {
return new KafkaTemplate<>(factory, true);
}

@Bean("kafkaListenerContainerFactory")
ConcurrentKafkaListenerContainerFactory<Object, Object> listenerContainerFactory(Environment env, ConsumerFactory<Object, Object> consumerFactory, KafkaTransactionManager<Object, Object> transactionManager) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setAutoStartup(true);
factory.setConcurrency(1);
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setTransactionManager(transactionManager);
factory.getContainerProperties().setGroupId(env.getRequiredProperty("spring.kafka.consumer.group-id"));
return factory;
}

当我删除setTransactionManager(transactionManager)语句后,速度大幅提高。我是不是做错了什么?

最佳答案

Kafka 事务非常昂贵 - 特别是如果您提交每次发送。

参见Transactions in Apache Kafka .

向下滚动到“事务如何执行以及如何调整它们”。

As we can see the overhead is independent of the number of messages written as part of a transaction. So the key to having higher throughput is to include a larger number of messages per transaction.

借助 Spring for Apache Kafka,您可以使用 executeInTransaction 在同一个事务中执行多次发送。方法。或者通过 KafkaTransactionManager 使用 Spring 事务管理并在 @Transactional 内执行多次发送方法。

编辑

我没有注意到监听器容器;我假设您正在使用一条消息,执行一些转换并发送到另一个主题。因此,在这种情况下,您不能“在事务中发送多条消息”,因为容器管理事务,并且默认情况下在每次传送后提交。

增加并发度不会影响事务语义;在您的情况下(并发数为 10),分区分布在 10 个线程中。每个线程运行一个单独的事务。

您可以通过设置 batchListener 进一步加快速度至true关于容器工厂。

在这种情况下,您的 @KafkaListener得到 List<ConsumerRecord> (或 List<Foo> 如果您使用转换);您可以迭代列表并处理每个记录并使用模板发送它(不要使用executeInTransaction,因为已经有一个由容器线程启动的事务)。然后,当批处理完成时,容器将提交事务。

您可以使用 kafka max.poll.records 控制批量大小消费者属性(property)。

关于performance - Spring +卡夫卡: Transactions slow,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49754998/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com