gpt4 book ai didi

java - Apache Kafka 系统错误处理

转载 作者:行者123 更新时间:2023-11-30 06:43:49 25 4
gpt4 key购买 nike

我们正在尝试将 Kafka 实现为我们的消息代理解决方案。我们正在 IBM BLuemix 中部署 Spring Boot 微服务,其内部消息代理实现是 Kafka 版本 0.10。由于我的经验更多是在 JMS、ActiveMQ 端,所以我想知道在 java 消费者中处理系统级错误的理想方法应该是什么?

这是我们目前的实现方式

消费者属性

enable.auto.commit=false
auto.offset.reset=latest

我们正在使用默认属性

max.partition.fetch.bytes
session.timeout.ms

卡夫卡消费者

我们为每个主题启动 3 个线程,所有线程都具有相同的 groupId,即每个线程一个 KafkaConsumer 实例。到目前为止我们只有一个分区。线程类的构造函数中的消费者代码如下所示

kafkaConsumer = new KafkaConsumer<String, String>(properties);

final List<String> topicList = new ArrayList<String>();
topicList.add(properties.getTopic());

kafkaConsumer.subscribe(topicList, new ConsumerRebalanceListener() {

@Override
public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {
}

@Override
public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
try {
logger.info("Partitions assigned, consumer seeking to end.");

for (final TopicPartition partition : partitions) {
final long position = kafkaConsumer.position(partition);
logger.info("current Position: " + position);

logger.info("Seeking to end...");
kafkaConsumer.seekToEnd(Arrays.asList(partition));
logger.info("Seek from the current position: " + kafkaConsumer.position(partition));
kafkaConsumer.seek(partition, position);
}
logger.info("Consumer can now begin consuming messages.");
} catch (final Exception e) {
logger.error("Consumer can now begin consuming messages.");
}

}
});

实际的读取发生在线程的run方法中

try {
// Poll on the Kafka consumer every second.
final ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);


// Iterate through all the messages received and print their
// content.
for (final TopicPartition partition : records.partitions()) {

final List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
logger.info("consumer is alive and is processing "+ partitionRecords.size() +" records");
for (final ConsumerRecord<String, String> record : partitionRecords) {
logger.info("processing topic "+ record.topic()+" for key "+record.key()+" on offset "+ record.offset());

final Class<? extends Event> resourceClass = eventProcessors.getResourceClass();
final Object obj = converter.convertToObject(record.value(), resourceClass);
if (obj != null) {
logger.info("Event: " + obj + " acquired by " + Thread.currentThread().getName());
final CommsEvent event = resourceClass.cast(converter.convertToObject(record.value(), resourceClass));
final MessageResults results = eventProcessors.processEvent(event
);
if ("Success".equals(results.getStatus())) {
// commit the processed message which changes
// the offset
kafkaConsumer.commitSync();
logger.info("Message processed sucessfully");
} else {
kafkaConsumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
logger.error("Error processing message : {} with error : {},resetting offset to {} ", obj,results.getError().getMessage(),record.offset());
break;
}

}
}

}
// TODO add return

} catch (final Exception e) {
logger.error("Consumer has failed with exception: " + e, e);
shutdown();
}

您会注意到EventProcessor,它是一个处理每条记录的服务类,在大多数情况下将记录提交到数据库中。如果处理器抛出错误(系统异常或验证异常),我们不会提交,而是以编程方式将查找设置为该偏移量,以便后续轮询将从该组 ID 的该偏移量返回。

现在的疑问是,这是正确的做法吗?如果我们收到错误并设置偏移量,那么在修复该错误之前不会处理其他消息。这可能适用于系统错误,例如无法连接到数据库,但如果问题仅与该事件有关,而不是其他处理这一记录的问题,我们将无法处理任何其他记录。我们想到了 ErrorTopic 的概念,当我们收到错误时,消费者会将该事件发布到 ErrorTopic,同时它将继续处理其他后续事件。但看起来我们正在尝试将JMS的设计理念(由于我之前的经验)引入到kafka中,并且可能有更好的方法来解决kafka中的错误处理。另外,从错误主题重新处理它可能会改变消息的顺序,这在某些情况下是我们不希望的

请让我知道任何人如何在他们的项目中遵循 Kafka 标准处理这种情况。

-塔莎

最佳答案

if the problem is only with that event and not others to process this one record we wont be able to process any other record

这是正确的,您使用错误主题的建议似乎是可能的。

我还注意到,在处理 onPartitionsAssigned 时,您基本上不使用消费者提交的偏移量,因为您似乎总是会寻找到最后。

如果您想从上次成功提交的偏移量重新启动,则不应执行 seek

最后,我想指出,尽管您似乎知道这一点,但同一组中的 3 个消费者订阅单个分区 - 意味着 3 个消费者中的 2 个将处于空闲状态。

HTH江户

关于java - Apache Kafka 系统错误处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43985090/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com