gpt4 book ai didi

spring-boot - Spring-Kafka 中的 poll() 和 commitAsync()

转载 作者:行者123 更新时间:2023-12-04 08:53:24 24 4
gpt4 key购买 nike

我正在尝试用 java 编写一个 kafka 消费者应用程序Springboot 平台。早些时候,我用纯 Java 编写了代码,但是现在转换为 spring-kafka,因为它可以提供一些优势普通的java。我确实有几个问题想弄明白。

  • 看来我不必在 spring-kafka 中显式地进行 poll() 循环它会由@KafkaListener 自动处理吗?

  • 我已经设置了 enable.auto.commit='false',因为我必须做一些事情在提交偏移量之前进行处理,我怎样才能在中执行 commitAsync()Spring-Kafka?

    消费者配置.java :

    @EnableKafka
    @Configuration
    public class KafkaConsumerConfig {

    @Value("${app.kafka_brokers}")
    private String KAFKA_BROKERS;

    @Value("${app.topic}")
    private String KAFKA_TOPIC;

    @Value("${app.group_id_config}")
    private String GROUP_ID_CONFIG;

    @Value("${app.schema_registry_url}")
    private String SCHEMA_REGISTRY_URL;

    @Value("${app.offset_reset}")
    private String OFFSET_RESET;

    @Value("${app.max_poll_records}")
    private String MAX_POLL_RECORDS;

    @Value("${app.security.protocol}")
    private String SSL_PROTOCOL;

    @Value("${app.ssl.truststore.password}")
    private String SSL_TRUSTSTORE_SECURE;

    @Value("${app.ssl.keystore.password}")
    private String SSL_KEYSTORE_SECURE;

    @Value("${app.ssl.key.password}")
    private String SSL_KEY_SECURE;

    @Value("${app.ssl.truststore.location}")
    private String SSL_TRUSTSTORE_LOCATION_FILE_NAME;

    @Value("${app.ssl.keystore.location}")
    private String SSL_KEYSTORE_LOCATION_FILE_NAME;

    @Bean
    public ConsumerFactory<String, String> consumerFactory(){

    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_BROKERS);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
    props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
    props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SSL_PROTOCOL);
    props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,SSL_TRUSTSTORE_LOCATION_FILE_NAME);
    props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, SSL_TRUSTSTORE_SECURE);
    props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,SSL_KEYSTORE_LOCATION_FILE_NAME);
    props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, SSL_KEYSTORE_SECURE);
    props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, SSL_KEY_SECURE);

    return new DefaultKafkaConsumerFactory<>(props);

    }

    @Bean
    ConcurrentKafkaListenerContainerFactory<String, String>
    kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, String> factory =
    new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(3);
    return factory;
    }

    }

KafkaConsumer.java :

@Component
public class KafkaConsumer {

@KafkaListener(topics = "topic", groupId = "group")
public void run(ConsumerRecord<String, GenericRecord> record) {

System.out.println(record);

<-- how to asyncCommit()-->
}

}

最佳答案

首先,我建议您使用 Spring kafka 设置的属性和自动配置,而不是自己创建,因为它遵循DRY 原则:不要重复自己。

spring:
kafka:
bootstrap-servers: ${app.kafka_brokers}
consumer:
auto-offset-reset: ${app.offset_reset}
enable-auto-commit: false // <---- disable auto committing
ssl:
protocol: ${app.security.protocol}
key-store-location: ${app.ssl.keystore.location}
key-store-password: ${app.ssl.keystore.password}
trust-store-location: ${app.ssl.truststore.location}
trust-store-password: ${app.ssl.truststore.password}
// And other properties
listener:
ack-mode: manual // This is what you need

AckMode 文档:https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/ContainerProperties.AckMode.html

本质上,manual是异步确认,而manual_immediate是同步的。

然后在您的 @KafkaListener 组件中,您可以注入(inject) org.springframework.kafka.support.Acknowledgment 对象确认您的消息。

@Component
public class KafkaConsumer {

@KafkaListener(topics = "topic", groupId = "group")
public void run(ConsumerRecord<String, GenericRecord> record, Acknowledgment acknowledgment) {

System.out.println(record);

acknowledgment.acknowledge();
}

}

这是关于可以注入(inject)到@KafkaListener 方法中的内容的文档:https://docs.spring.io/spring-kafka/reference/html/#message-listeners

关于spring-boot - Spring-Kafka 中的 poll() 和 commitAsync(),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63980000/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com