gpt4 book ai didi

spring-boot - 如何在属性文件中正确外部化 spring-boot kafka-streams 配置?

转载 作者:行者123 更新时间:2023-12-04 15:44:15 26 4
gpt4 key购买 nike

我正在尝试将我目前用 Java 代码编写的 spring-kafka 应用程序的配置外部化。我应该把 ProducerConfigConsumerConfig值到 spring.kafka.streams.properties ,或者如果我通过 spring.kafka.producer 提供它们,它们是否会被正确配置?和 spring.kafka.consumer

到目前为止,似乎我应该将我所有的配置放入类型为 KafkaStreamsConfiguration 的 bean 中。为了配置我的 kafka-streams 应用程序。目前,我通过设置 ProducerConfig 来做到这一点和 ConsumerConfig直接在代码中的值。

当我外部化此配置时,似乎从 ProducerConfig 设置属性值和 ConsumerConfigapplication.properties文件与它们在 KafkaStreamsConfiguration 中无关由 spring-boot 创建(我通过在某处 Autowiring 配置并查看它来确认这一点)。

如果我改为提供 ProducerConfigConsumerConfig通过 spring.kafka.streams.properties 的值它们出现在 KafkaStreamsConfiguration 中.

这是我的旧 Java 配置:

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();

props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, commitInterval);
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class.getName());
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
props.put("replication.factor", replicationFactor);
props.put(StreamsConfig.STATE_DIR_CONFIG, "/var/lib/kafka-streams");
props.put(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, "600000");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

return new KafkaStreamsConfiguration(props);
}

这以 ProducerConfig 结尾和 ConsumerConfig值不在 KafkaStreamsConfiguration 中在运行时:

spring.kafka.streams.bootstrap-servers=localhost:9092
spring.kafka.streams.properties.schema.registry.url=http://localhost:8081
spring.kafka.streams.application-id=<application_id>
spring.kafka.consumer.group-id=<group_id> #this won't show up in KafkaStreamsConfiguration
spring.kafka.streams.replication-factor=1
spring.kafka.streams.properties.commit.interval.ms=100
spring.kafka.streams.properties.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.default.deserialization.exception.handler=org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
spring.kafka.producer.compression-type=lz4 #this won't show up in KafkaStreamsConfiguration
spring.kafka.streams.properties.state.dir=/var/lib/kafka-streams
spring.kafka.streams.properties.state.cleanup.delay.ms=600000
spring.kafka.consumer.auto-offset-reset=latest #this won't show up in KafkaStreamsConfiguration
spring.kafka.streams.properties.timestamp.extractor=org.apache.kafka.streams.processor.WallclockTimestampExtractor

但是,这确实会导致 KafkaStreamsConfiguration具有预期的值:

spring.kafka.streams.bootstrap-servers=localhost:9092
spring.kafka.streams.properties.schema.registry.url=http://localhost:8081
spring.kafka.streams.application-id=<application_id>
spring.kafka.streams.properties.group-id=<group_id> #this shows up in KafkaStreamsConfiguration
spring.kafka.streams.replication-factor=1
spring.kafka.streams.properties.commit.interval.ms=100
spring.kafka.streams.properties.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.default.deserialization.exception.handler=org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
spring.kafka.streams.properties.compression-type=lz4 #this shows up in KafkaStreamsConfiguration
spring.kafka.streams.properties.state.dir=/var/lib/kafka-streams
spring.kafka.streams.properties.state.cleanup.delay.ms=600000
spring.kafka.streams.properties.auto-offset-reset=latest #this shows up in KafkaStreamsConfiguration
spring.kafka.streams.properties.timestamp.extractor=org.apache.kafka.streams.processor.WallclockTimestampExtractor

我期待着 ProducerConfigConsumerConfig传播到 KafkaStreamsConfiguration 的值通过 spring.kafka.producer 设置时和 spring.kafka.consumer分别。特别是因为我在 IntelliJ 中获得了 Intellisense,用于 application.properties 中的生产者和消费者配置。 .

就是说,我是否需要确保通过 spring.kafka.streams.properties 设置它们?为了正确配置应用程序?

最佳答案

spring.kafka.consumer.group-id=<group_id> #this won't show up in KafkaStreamsConfiguration

Streams 设置 group.idapplication.id属性(property)。

public static final String APPLICATION_ID_CONFIG = "application.id";

private static final String APPLICATION_ID_DOC = "An identifier for the stream processing application. Must be unique within the Kafka cluster. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.";

参见 KafkaProperties .

streams , producerconsumer属性是不同的且无关的。

spring.kafka.producer.compression-type=lz4 #this won't show up in KafkaStreamsConfiguration

compression-type不会作为流的一流引导属性公开。您可以使用

spring.kafka.streams.properties.compression.type=gzip

关于spring-boot - 如何在属性文件中正确外部化 spring-boot kafka-streams 配置?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56572044/

26 4 0