gpt4 book ai didi

java - Kafka - 如何跳过偏移量中的错误消息并使用其余消息

转载 作者:塔克拉玛干 更新时间:2023-11-02 08:33:02 26 4
gpt4 key购买 nike

我正在使用 Kafka 和 Avro 进行序列化/反序列化以处理事件。如果不符合 avro 模式的错误数据偶然出现在主题中,

.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer app: host: dcId: envId: url: reqId: jsess: secSessId: logUser: effUser: impUser: channelName: - Container exception
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition EventProcessor-0 at offset 2845. If needed, please seek past the record to continue consumption.

并且消息在相同的偏移量下不断增长。是否有可能跳过这个偏移量并继续从其他偏移量读取,如果再次发生同样的情况,也跳过它?

消费者代码:

@KafkaListener(topics = "EventProcessor", containerFactory = "eventProcessorListenerContainerFactory")
public void listen(Event payLoad) {

System.out.println("REceived message ===> " + payLoad);

}

工厂:

@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Event>> eventProcessorListenerContainerFactory() {

Map<String, Object> propMap = new HashMap<String, Object>();
propMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
propMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propMap.put(ConsumerConfig.GROUP_ID_CONFIG, "EventProcessorConsumer");

DefaultKafkaConsumerFactory<String, Event> consuemrFactory = new DefaultKafkaConsumerFactory<String, Event>(
propMap);

consuemrFactory.setValueDeserializer(new AvroDeSerializer<Event>(
Event.class));
ConcurrentKafkaListenerContainerFactory<String, Event> listenerFactory = new ConcurrentKafkaListenerContainerFactory<>();
listenerFactory.setConsumerFactory(consuemrFactory);
listenerFactory.setConcurrency(3);
listenerFactory.setRetryTemplate(retryTemplate());
listenerFactory.getContainerProperties().setPollTimeout(3000);
return listenerFactory;
}

最佳答案

尝试按照@Poppy 的建议调整您的政策

SimpleRetryPolicy policy = new SimpleRetryPolicy();
// Set the max retry attempts
policy.setMaxAttempts(5);
// Retry on all exceptions (this is the default)
policy.setRetryableExceptions(new Class[] {Exception.class});
// ... but never retry SerializationException
policy.setFatalExceptions(new Class[] {SerializationException.class}); //<-- here

// Use the policy...
RetryTemplate template = new RetryTemplate();
template.setRetryPolicy(policy);
template.execute(new RetryCallback<Foo>() {
public Foo doWithRetry(RetryContext context) {
// business logic here
}
});

来自这里:https://docs.spring.io/spring-batch/3.0.x/reference/html/retry.html

关于java - Kafka - 如何跳过偏移量中的错误消息并使用其余消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50043496/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com