gpt4 book ai didi

java - @KafkaListener 不会读取以前的消息,即使它们没有被确认,除非应用程序重新启动

转载 作者:行者123 更新时间:2023-12-01 19:37:46 26 4
gpt4 key购买 nike

所以基本上我看到的是我正在使用 @KafkaListener 来消费消息,并且故意不想确认它们,因此消费者会忽略它们并监听主题中的新消息。因此,如果应用程序重新启动,消费者只能从最后提交的消息开始。因此,我想避免这种情况,我希望消费者在不重新启动的情况下考虑未提交的消息。下面是我正在使用的配置和@kafkalistener。

ReceiverConfig.java

package com.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class ReceiverConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Value("${spring.kafka.group}")
private String group;

@Value("${spring.kafka.auto-offset-reset}")
private String offset;

@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "CAS-CLIENT-001");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); //TODO Remove if you want Kafka to automatically acknowledge
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30000);
return props;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(1);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); //TODO Remove if you want Kafka to automatically acknowledge
return factory;
}
}

和 ReceiverService

package com.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

@Service
public class ReceiverService {

private final Logger LOG = LoggerFactory.getLogger(ReceiverService.class);

@KafkaListener(topics = "${spring.kafka.topic}")
private synchronized void consumeKafkaQueue(@Payload String message, Acknowledgment acknowledgment) {
LOG.info("Received message from kafka queue: {}", message);
//TODO Do your code here...
//acknowledgment.acknowledge();
//Note if the "acknowledgment" parameter is left out Kafka will be acknowledged as soon as the method finishes.
}
}

最佳答案

有什么值(value)?

@Value("${spring.kafka.auto-offset-reset}")
private String offset;

here您可以看到所有消费者配置

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):

earliest: automatically reset the offset to the earliest offset

latest: automatically reset the offset to the latest offset

none: throw exception to the consumer if no previous offset is found for the consumer's group

anything else: throw exception to the consumer.

您应该尽早使用

关于java - @KafkaListener 不会读取以前的消息,即使它们没有被确认,除非应用程序重新启动,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59197451/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com