gpt4 book ai didi

java - 使用@KafkaListener 处理错误

转载 作者:行者123 更新时间:2023-11-30 10:31:47 26 4
gpt4 key购买 nike

我正在使用具有以下配置的 spring-kafka:

package com.danigu.fancypants.infrastructure;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Data;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.backoff.BackOffPolicy;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;

import javax.inject.Inject;
import java.util.HashMap;
import java.util.Map;

/**
* @author dani
*/
@Data
@EnableKafka
@Configuration
@Import({KafkaConfigurationProperties.class})
public class KafkaConfiguration {
@Inject KafkaConfigurationProperties kcp;

protected Map<String, Object> consumerProperties() {
Map<String, Object> props = new HashMap();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kcp.getBrokerAddress());
props.put(ConsumerConfig.GROUP_ID_CONFIG, kcp.getGroupId());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}

public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerProperties());
}

@Bean
public StringJsonMessageConverter stringJsonMessageConverter(ObjectMapper mapper) {
return new StringJsonMessageConverter(mapper);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
StringJsonMessageConverter messageConverter) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();

factory.setMessageConverter(messageConverter);
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(1);
factory.setRetryTemplate(retryTemplate());

return factory;
}

/*
* Retry template.
*/

protected RetryPolicy retryPolicy() {
SimpleRetryPolicy policy = new SimpleRetryPolicy();
policy.setMaxAttempts(3);
return policy;
}

protected BackOffPolicy backOffPolicy() {
ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
policy.setInitialInterval(1000);
return policy;
}

protected RetryTemplate retryTemplate() {
RetryTemplate template = new RetryTemplate();

template.setRetryPolicy(retryPolicy());
template.setBackOffPolicy(backOffPolicy());

return template;
}
}

我的听众看起来像这样:

package com.danigu.fancypants.integration.inbound.dress;

import com.danigu.fancypants.integration.inbound.InvalidRequestException;
import com.danigu.fancypants.integration.inbound.dress.payload.DressRequest;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import javax.inject.Inject;
import javax.validation.ConstraintViolation;
import javax.validation.Validator;
import java.util.Set;

/**
* @author dani
*/
@Component
public class DressListener {

@Inject protected Validator validator;

@KafkaListener(topics = {"${kafka.dressesTopic}"})
public void onMessage(@Payload DressRequest request, Acknowledgment acknowledgment) {
assertValidRequest(request);

System.out.println(request);

acknowledgment.acknowledge();
}

protected void assertValidRequest(DressRequest request) {
final Set<ConstraintViolation<DressRequest>> violations = validator.validate(request);

if(!violations.isEmpty()) {
throw new InvalidRequestException(violations, request);
}
}
}

到目前为止,我一直在查看spring-kafka 的测试和引用文档,there文档说应该配置适当类型的 ErrorHandler,这 test暗示我应该在 ContainerProperties 上配置它,尽管那只是一个错误处理程序,在我的用例中,我想定义多个(针对不同的有效负载类型),这是否可能,如果是的话,怎么办?

此外,是否有一种方法可以描述在带注释的监听器 void 上使用哪个错误处理程序?

此外,是否有一种方法可以根据 @KafkaListener 或可能根据不同的主题描述 RecoveryCallback,或者必须有不同的 ListenerContainerFactory为此?

我可能完全弄错了,有人能给我指出正确的方向吗?请问我如何为不同的负载类型正确配置多个 ErrorHandler

最佳答案

我不确定“不同的负载类型”是什么意思,因为您只有一个 @KafkaListener。类级别的 @KafkaListener 可以在方法级别为不同的有效负载类型设置 @KafkaHandler

在任何情况下,每个容器只有一个错误处理程序,因此您需要为每个错误处理程序使用不同的容器工厂(恢复回调也是如此)。

我们最近在 spring-amqp@RabbitListener 上添加了一个 errorHandler ...

/**
* Set an {@link RabbitListenerErrorHandler} to invoke if the listener method throws
* an exception.
* @return the error handler.
* @since 2.0
*/
String errorHandler() default "";

...所以每个方法都可以有自己的错误处理程序。

我们可能会为下一个版本的 spring-kafka 做类似的事情。但是对于每个 @KafkaListener 它仍然只有一个,所以它对类级别的 @KafkaListener 没有帮助。

关于java - 使用@KafkaListener 处理错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43007650/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com