- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我的 Scs 应用程序有两个具有此配置的 Kafka 生产者:
spring:
cloud:
function:
definition: myProducer1;myProducer2
stream:
bindings:
myproducer1-out-0:
destination: topic1
producer:
useNativeEncoding: true
myproducer2-out-0:
destination: topic2
producer:
useNativeEncoding: true
kafka:
binder:
brokers: ${kafka.brokers:localhost}
min-partition-count: 3
replication-factor: 3
producerProperties:
enable:
idempotence: false
retries: 10000
acks: all
key:
serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
subject:
name:
strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
value:
serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
subject:
name:
strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
schema:
registry:
url: ${schema-registry.url:http://localhost:8081}
o.s.c.s.m.DirectWithAttributesChannel : Channel 'my-app-1.myproducer2-out-0' has 1 subscriber(s).
o.s.b.web.embedded.netty.NettyWebServer : Netty started on port(s): 8084
e.p.i.m.MyAppApplicationKt : Started MyAppApplicationKt in 11.288 seconds (JVM running for 11.868)
enabled.idempotence: true
.有了这个改变,启动时间慢了 7 倍(有时甚至超过 10 倍):
o.s.c.s.m.DirectWithAttributesChannel : Channel 'my-app-1.myproducer2-out-0' has 1 subscriber(s).
o.s.b.web.embedded.netty.NettyWebServer : Netty started on port(s): 8084
e.p.i.m.MyAppApplicationKt : Started MyAppApplicationKt in 71.489 seconds (JVM running for 72.127)
Proceeding to force close the producer since pending requests could not be completed within timeout 30000 ms.
),
有时它发生在其中一个生产者身上,其他人发生在两个生产者身上,其他人都没有发生 .当它没有出现时,启动速度和以前一样快。
o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1] Instantiated an idempotent producer.
o.a.k.c.s.authenticator.AbstractLogin : Successfully logged in.
o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1
o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c01
o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1586864007183
org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: lkc-nvqmv
o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 30000 ms.
o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1] ProducerId set to 32029 with epoch 0
ProducerId set to 32029 with epoch 0
卡住 30 秒后,它记录了
Proceeding to force close...
的信息消息并初始化第二个生产者没有任何问题:
o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1] Proceeding to force close the producer since pending
o.s.c.s.m.DirectWithAttributesChannel : Channel 'my-app-1.myproducer1-out-0' has 1 subscriber(s).
o.s.c.s.b.k.p.KafkaTopicProvisioner : Using kafka topic for outbound: topic2
o.a.k.clients.admin.AdminClientConfig : AdminClientConfig values:
...
o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-2] Instantiated an idempotent producer.
o.a.k.c.s.authenticator.AbstractLogin : Successfully logged in.
o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1
o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c01
o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1586864038612
org.apache.kafka.clients.Metadata : [Producer clientId=producer-2] Cluster ID: lkc-nvqmv
o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-2] Closing the Kafka producer with timeoutMillis = 30000 ms.
o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-2] ProducerId set to 32030 with epoch 0
o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-2] Proceeding to force close the producer since pending
o.s.c.s.m.DirectWithAttributesChannel : Channel 'my-app-1.myproducer2-out-0' has 1 subscriber(s).
o.s.b.web.embedded.netty.NettyWebServer : Netty started on port(s): 8084
e.p.i.m.MetricsIngestorApplicationKt : Started MetricsIngestorApplicationKt in 66.834 seconds (JVM running for 67.544)
doBindProducer()
期间方法。它获取主题的分区,并在
KafkaMessageChannelBinder
中为其创建一个 ProducerFactory|。 .
@Override
protected MessageHandler createProducerMessageHandler(
final ProducerDestination destination,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties,
MessageChannel channel, MessageChannel errorChannel) throws Exception {
/*
* IMPORTANT: With a transactional binder, individual producer properties for
* Kafka are ignored; the global binder
* (spring.cloud.stream.kafka.binder.transaction.producer.*) properties are used
* instead, for all producers. A binder is transactional when
* 'spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix' has text.
*/
final ProducerFactory<byte[], byte[]> producerFB = this.transactionManager != null
? this.transactionManager.getProducerFactory()
: getProducerFactory(null, producerProperties);
Collection<PartitionInfo> partitions = provisioningProvider.getPartitionsForTopic(
producerProperties.getPartitionCount(), false, () -> {
Producer<byte[], byte[]> producer = producerFB.createProducer();
List<PartitionInfo> partitionsFor = producer
.partitionsFor(destination.getName());
producer.close();
if (this.transactionManager == null) {
((DisposableBean) producerFB).destroy();
}
return partitionsFor;
}, destination.getName());
List<PartitionInfo> partitionsFor
,它会卡在 KafkaProducer.destroy() 中,直到 30 秒超时到期:
最佳答案
我不确定为什么关闭超时,但您应该能够配置该超时。
请针对活页夹打开一个问题;它目前不支持从默认值(30 秒)减少关闭超时。
关于apache-kafka - 使用 enable.idempotence true 时,Spring Cloud Stream Kafka 应用程序的启动速度极慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61205629/
我正在编写一个 OpenGL C++ 包装器。该包装器旨在减少复杂且易出错的用法。 例如,我目前希望用户只关注一点点 OpenGL Context。为此,我编写了一个类 gl_texture_2d。众
A post from another thread表示如果一个函数可以被多次调用而不改变结果,则该函数被称为幂等。 然而,所使用的术语(如无副作用和返回相同结果)相对含糊不清。考虑这段代码: pub
我在研究 Scala DB 框架/包装器时,偶然发现了来自 Twitter 的 Gizzard。虽然起初给我留下了深刻的印象,但当我阅读限制时,我冷静下来了。他们说您进行的所有数据库操作都必须是幂等的
我正在使用Node SDK for AWS ,我有一个关于 createLogGroup 的问题和 createLogStream操作,这些操作是幂等的吗?即我可以多次调用 create 而不必担心重
我在我的 Django 应用程序中使用 Stripe。我有以下测试用例:incorrect_cvc导致 card_error。现在在纠正 CVC 并使用 4242 4242 4242 4242 时我除
我想直接向一个 stripe 连接的账户收费,我可以通过以下方式做到这一点 const charge = { amount, currency, source} ; return stri
我正在像这样调用 ecs.create_service: createServiceResponse = ecs.create_service( clientToken='abc123', clust
假设我们有一个包含以下数据的表: CREATE TABLE tab(i INT PRIMARY KEY); INSERT INTO tab(i) VALUES(1),(2),(3); SELECT *
我是 Rails 的新手,我想弄清楚 rake db:create和 rake db:migrate是幂等的。 换句话说,我可以对我的数据库(postgres 或 mysql)重复运行这两个命令而不会
我的 Scs 应用程序有两个具有此配置的 Kafka 生产者: spring: cloud: function: definition: myProducer1;myProdu
我是一名优秀的程序员,十分优秀!