gpt4 book ai didi

java - 卡夫卡消费者 : Stop processing messages when exception was raised

转载 作者:搜寻专家 更新时间:2023-11-01 03:16:27 24 4
gpt4 key购买 nike

我对 (Spring) Kafka 在停止 ConcurrentMessageListenerContainer 之后/时的 poll() 行为感到有点困惑。

我想要实现的目标:在引发异常(例如消息无法保存到数据库)后停止消费者,不提交偏移量,在给定时间后重新启动它并从先前失败的消息开始再次处理。

我读过这篇文章说容器将使用轮询中的剩余记录调用监听器 ( https://github.com/spring-projects/spring-kafka/issues/451 ) 这意味着不能保证在失败的消息之后,成功处理的进一步消息将提交抵消。这可能会导致消息丢失/跳过。

真的是这样吗?如果是,是否有无需升级新版本即可解决此问题的解决方案? (DLQ 不是我的案例的解决方案)

我已经做了:设置 setErrorHandler()setAckOnError(false)

private Map<String, Object> getConsumerProps(CustomKafkaProps kafkaProps,  Class keyDeserializer) {
Map<String, Object> props = new HashMap<>();
//Set common props
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProps.getBootstrapServers());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProps.getConsumerGroupId());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Start with the first message when a new consumer group (app) arrives at the topic
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // We will use "RECORD" AckMode in the Spring Listener Container

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);

if (kafkaProps.isSslEnabled()) {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put("ssl.keystore.location", kafkaProps.getKafkaKeystoreLocation());
props.put("ssl.keystore.password", kafkaProps.getKafkaKeystorePassword());
props.put("ssl.key.password", kafkaProps.getKafkaKeyPassword());
}

return props;
}

消费者

public ConcurrentMessageListenerContainer<String, byte[]> kafkaReceiverContainer(CustomKafkaProps kafkaProps) throws Exception {
StoppingErrorHandler stoppingErrorHandler = new StoppingErrorHandler();

ContainerProperties containerProperties = new ContainerProperties(...);
containerProperties.setAckMode(AbstractMessageListenerContainer.AckMode.RECORD);
containerProperties.setAckOnError(false);
containerProperties.setErrorHandler(stoppingErrorHandler);

ConcurrentMessageListenerContainer<String, byte[]> container = ...
container.setConcurrency(1); //use only one container
stoppingErrorHandler.setConcurrentMessageListenerContainer(container);

return container;
}

错误处理器

public class StoppingErrorHandler implements ErrorHandler {

@Setter
private ConcurrentMessageListenerContainer concurrentMessageListenerContainer;

@Value("${backends.kafka.consumer.halt.timeout}")
int consumerHaltTimeout;

@Override
public void handle(Exception thrownException, ConsumerRecord<?, ?> record) {
if (concurrentMessageListenerContainer != null) {
concurrentMessageListenerContainer.stop();
}

new Timer().schedule(new TimerTask() {
@Override
public void run() {
if (concurrentMessageListenerContainer != null && !concurrentMessageListenerContainer.isRunning()) {
concurrentMessageListenerContainer.start();
}
}
}, consumerHaltTimeout);
}
}

我在用什么:

  <groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>2.1.2.RELEASE</version>

<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.1.7.RELEASE</version>

最佳答案

without upgrading the newer versions?

2.1引入了ContainerStoppingErrorHandler这是一个 ContainerAwareErrorHandler,剩余未使用的消息将被丢弃(并且将在容器重新启动时重新获取)。

对于早期版本,您的监听器将需要拒绝(失败)批处理中的剩余消息(或设置 max.records.per.poll=1)。

关于java - 卡夫卡消费者 : Stop processing messages when exception was raised,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48886127/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com