gpt4 book ai didi

java - 在运行时重新加载 Spring Kafka 使用的 SSL 上下文

转载 作者:行者123 更新时间:2023-12-03 21:04:47 29 4
gpt4 key购买 nike

使 Spring Kafka 重新加载 SSL 上下文的推荐方法是什么?

我需要将新证书插入到我的 Kafka 制作人使用的信任存储中,而无需停机。

然而,我发现一旦应用程序启动并创建了 Kafka 生产者,an instance of SSLContext is created and cached .那里is a way to reconfigure到目前为止,我发现的唯一方法是通过 invoking the destroy method on DefaultKafkaProducerFactory 销毁任何现有的生产者。 (在证书更新后)导致对 KafkaTemplate.send 的任何后续调用强制创建一个新的生产者,然后重新加载 SSL 上下文。

我觉得这就像用大锤解决这个问题,可能会有更优雅的解决方案。我还注意到,如果我在发送消息时调用 destroy ,我会收到以下异常,当我们不能丢失任何事件时,这看起来不是很积极。

java.util.concurrent.CompletionException: org.apache.kafka.common.KafkaException: Producer closed while send in progress
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture$AsyncSupply.run$$$capture(CompletableFuture.java:1592)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java)
at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.kafka.common.KafkaException: Producer closed while send in progress
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:826)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803)
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:444)
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:372)
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:190)
at org.springframework.kafka.core.KafkaOperations$send.call(Unknown Source)
at com.example.event.publisher.kafka.KafkaEventPublisher.doPublish(KafkaEventPublisher.groovy:57)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:98)
at groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:325)
at org.codehaus.groovy.runtime.metaclass.ClosureMetaClass.invokeMethod(ClosureMetaClass.java:352)
at groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1034)
at org.codehaus.groovy.runtime.callsite.PogoMetaClassSite.callCurrent(PogoMetaClassSite.java:68)
at org.codehaus.groovy.runtime.callsite.AbstractCallSite.callCurrent(AbstractCallSite.java:177)
at com.example.event.publisher.kafka.KafkaEventPublisher$_publish_closure1.doCall(KafkaEventPublisher.groovy:47)
at com.example.event.publisher.kafka.KafkaEventPublisher$_publish_closure1.doCall(KafkaEventPublisher.groovy)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:98)
at groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:325)
at org.codehaus.groovy.runtime.metaclass.ClosureMetaClass.invokeMethod(ClosureMetaClass.java:264)
at groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1034)
at groovy.lang.Closure.call(Closure.java:418)
at org.codehaus.groovy.runtime.ConvertedClosure.invokeCustom(ConvertedClosure.java:54)
at org.codehaus.groovy.runtime.ConversionHandler.invoke(ConversionHandler.java:124)
at com.sun.proxy.$Proxy103.get(Unknown Source)
at java.util.concurrent.CompletableFuture$AsyncSupply.run$$$capture(CompletableFuture.java:1590)
... 6 common frames omitted
Caused by: org.apache.kafka.common.KafkaException: Requested metadata update after close
at org.apache.kafka.clients.Metadata.awaitUpdate(Metadata.java:200)
at org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:938)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:823)
... 37 common frames omitted

最佳答案

看起来简单地重置某些值的配置将触发 SSL 引擎工厂的重建。他们甚至调出会导致热重载的文件 key 配置。
link

关于java - 在运行时重新加载 Spring Kafka 使用的 SSL 上下文,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55001632/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com