gpt4 book ai didi

apache-kafka - 使用 enable.idempotence true 时,Spring Cloud Stream Kafka 应用程序的启动速度极慢

转载 作者:行者123 更新时间:2023-12-03 17:25:48 25 4
gpt4 key购买 nike

我的 Scs 应用程序有两个具有此配置的 Kafka 生产者:

spring:
cloud:
function:
definition: myProducer1;myProducer2
stream:
bindings:
myproducer1-out-0:
destination: topic1
producer:
useNativeEncoding: true
myproducer2-out-0:
destination: topic2
producer:
useNativeEncoding: true
kafka:
binder:
brokers: ${kafka.brokers:localhost}
min-partition-count: 3
replication-factor: 3
producerProperties:
enable:
idempotence: false
retries: 10000
acks: all
key:
serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
subject:
name:
strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
value:
serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
subject:
name:
strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
schema:
registry:
url: ${schema-registry.url:http://localhost:8081}

它在大约 10 秒后开始:
 o.s.c.s.m.DirectWithAttributesChannel    : Channel 'my-app-1.myproducer2-out-0' has 1 subscriber(s).
o.s.b.web.embedded.netty.NettyWebServer : Netty started on port(s): 8084
e.p.i.m.MyAppApplicationKt : Started MyAppApplicationKt in 11.288 seconds (JVM running for 11.868)

我需要我的生产者是幂等的,所以我设置了 enabled.idempotence: true .有了这个改变,启动时间慢了 7 倍(有时甚至超过 10 倍):
 o.s.c.s.m.DirectWithAttributesChannel    : Channel 'my-app-1.myproducer2-out-0' has 1 subscriber(s).
o.s.b.web.embedded.netty.NettyWebServer : Netty started on port(s): 8084
e.p.i.m.MyAppApplicationKt : Started MyAppApplicationKt in 71.489 seconds (JVM running for 72.127)

如何加快启动速度?

更新:

我在启动过程中发现了一个问题 ( Proceeding to force close the producer since pending requests could not be completed within timeout 30000 ms. ), 有时它发生在其中一个生产者身上,其他人发生在两个生产者身上,其他人都没有发生 .当它没有出现时,启动速度和以前一样快。

在以下日志中,它仅发生在一个生产者中:
o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Instantiated an idempotent producer.
o.a.k.c.s.authenticator.AbstractLogin : Successfully logged in.
o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1
o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c01
o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1586864007183
org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: lkc-nvqmv
o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 30000 ms.
o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1] ProducerId set to 32029 with epoch 0

然后在 ProducerId set to 32029 with epoch 0 卡住 30 秒后,它记录了 Proceeding to force close... 的信息消息并初始化第二个生产者没有任何问题:
o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Proceeding to force close the producer since pending 
o.s.c.s.m.DirectWithAttributesChannel : Channel 'my-app-1.myproducer1-out-0' has 1 subscriber(s).
o.s.c.s.b.k.p.KafkaTopicProvisioner : Using kafka topic for outbound: topic2
o.a.k.clients.admin.AdminClientConfig : AdminClientConfig values:
...
o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-2] Instantiated an idempotent producer.
o.a.k.c.s.authenticator.AbstractLogin : Successfully logged in.
o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1
o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c01
o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1586864038612
org.apache.kafka.clients.Metadata : [Producer clientId=producer-2] Cluster ID: lkc-nvqmv
o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-2] Closing the Kafka producer with timeoutMillis = 30000 ms.
o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-2] ProducerId set to 32030 with epoch 0
o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-2] Proceeding to force close the producer since pending
o.s.c.s.m.DirectWithAttributesChannel : Channel 'my-app-1.myproducer2-out-0' has 1 subscriber(s).
o.s.b.web.embedded.netty.NettyWebServer : Netty started on port(s): 8084
e.p.i.m.MetricsIngestorApplicationKt : Started MetricsIngestorApplicationKt in 66.834 seconds (JVM running for 67.544)

更新 2:

我已经调试了这背后的逻辑,它发生在 doBindProducer() 期间方法。它获取主题的分区,并在 KafkaMessageChannelBinder 中为其创建一个 ProducerFactory|。 .
    @Override
protected MessageHandler createProducerMessageHandler(
final ProducerDestination destination,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties,
MessageChannel channel, MessageChannel errorChannel) throws Exception {
/*
* IMPORTANT: With a transactional binder, individual producer properties for
* Kafka are ignored; the global binder
* (spring.cloud.stream.kafka.binder.transaction.producer.*) properties are used
* instead, for all producers. A binder is transactional when
* 'spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix' has text.
*/
final ProducerFactory<byte[], byte[]> producerFB = this.transactionManager != null
? this.transactionManager.getProducerFactory()
: getProducerFactory(null, producerProperties);
Collection<PartitionInfo> partitions = provisioningProvider.getPartitionsForTopic(
producerProperties.getPartitionCount(), false, () -> {
Producer<byte[], byte[]> producer = producerFB.createProducer();
List<PartitionInfo> partitionsFor = producer
.partitionsFor(destination.getName());
producer.close();
if (this.transactionManager == null) {
((DisposableBean) producerFB).destroy();
}
return partitionsFor;
}, destination.getName());

正确检索此列表后 List<PartitionInfo> partitionsFor ,它会卡在 KafkaProducer.destroy() 中,直到 30 秒超时到期:

enter image description here

为什么会堵在那里?会不会是活页夹的错误?

最佳答案

我不确定为什么关闭超时,但您应该能够配置该超时。

请针对活页夹打开一个问题;它目前不支持从默认值(30 秒)减少关闭超时。

关于apache-kafka - 使用 enable.idempotence true 时,Spring Cloud Stream Kafka 应用程序的启动速度极慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61205629/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com