gpt4 book ai didi

apache-kafka - 如何使用 Spring 云流访问使用密码保护的融合架构注册服务器?

转载 作者:行者123 更新时间:2023-12-01 21:57:42 27 4
gpt4 key购买 nike

我正在使用 spring cloud stream 和 Aiven使用 confluent's schema registry 的模式注册表. Aiven 的模式注册表使用密码保护。基于these说明,需要设置这两个配置参数才能成功访问模式注册服务器。

 props.put("basic.auth.credentials.source", "USER_INFO");
props.put("basic.auth.user.info", "avnadmin:schema-reg-password");

当我只使用vanilla java的kafka驱动时一切都很好,但是如果我使用Spring cloud stream,我不知道如何注入(inject)这两个参数。目前,我将 "basic.auth.user.info""basic.auth.credentials.source" 放在 "spring.cloud. application.yml 文件中的 stream.kafka.binder.configuration”

这样做,我在模式想要注册的行上得到 "401 Unauthorized"

更新 1:

根据“Ali n”的建议,我更新了配置 SchemaRegistryClient 的 bean 的方式,以便它了解 SSL 上下文。

@Bean
public SchemaRegistryClient schemaRegistryClient(
@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint) {
try {
final KeyStore keyStore = KeyStore.getInstance("PKCS12");
keyStore.load(new FileInputStream(
new File("path/to/client.keystore.p12")),
"secret".toCharArray());

final KeyStore trustStore = KeyStore.getInstance("JKS");
trustStore.load(new FileInputStream(
new File("path/to/client.truststore.jks")),
"secret".toCharArray());

TrustStrategy acceptingTrustStrategy = (X509Certificate[] chain, String authType) -> true;

SSLContext sslContext = SSLContextBuilder
.create()
.loadKeyMaterial(keyStore, "secret".toCharArray())
.loadTrustMaterial(trustStore, acceptingTrustStrategy)
.build();

HttpClient httpClient = HttpClients.custom().setSSLContext(sslContext).build();
ClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory(
httpClient);
ConfluentSchemaRegistryClient schemaRegistryClient = new ConfluentSchemaRegistryClient(
new RestTemplate(requestFactory));
schemaRegistryClient.setEndpoint(endpoint);
return schemaRegistryClient;
} catch (Exception ex) {
ex.printStackTrace();
return null;
}
}

这有助于消除应用程序启动时的错误并注册架构。但是,每当应用程序想要向 Kafka 推送消息时,又会抛出一个新的错误。最后这也被 mmelsen 的回答解决了。

最佳答案

我遇到了与我所处情况相同的问题,即连接到由 aiven 托管并由基本身份验证保护的安全模式注册表。为了让它工作,我必须配置以下属性:

spring.kafka.properties.schema.registry.url=https://***.aiven***.com:port
spring.kafka.properties.basic.auth.credentials.source=USER_INFO
spring.kafka.properties.basic.auth.user.info=username:password

我的 Binder 的其他属性是:

spring.cloud.stream.binders.input.type=kafka
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.brokers=https://***.aiven***.com:port <-- different from the before mentioned port
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.security.protocol=SSL
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location=truststore.jks
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.truststore.password=secret
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.keystore.type=PKCS12
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.keystore.location=clientkeystore.p12
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.keystore.password=secret
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.key.password=secret
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.streams.binder.autoCreateTopics=false

实际发生的是 Spring 云流将 spring.kafka.properties.basic* 添加到 DefaultKafkaConsumerFactory 并将配置添加到 KafkaConsumer。在 spring kafka 初始化期间的某个时刻,创建了一个 CachedSchemaRegistryClient 并提供了这些属性。此客户端包含一个名为 configureRestService 的方法,该方法将检查属性映射是否包含“basic.auth.credentials.source”。当我们通过 spring.kafka.properties 提供它时,它会找到这个属性,并在访问架构注册表的端点时负责创建适当的 header 。

希望这对你也有用。

我正在使用 spring cloud 版本 Greenwich.SR1、spring-boot-starter 2.1.4.RELEASE、avro-version 1.8.2 和 confluent.version 5.2.1

关于apache-kafka - 如何使用 Spring 云流访问使用密码保护的融合架构注册服务器?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55701542/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com