gpt4 book ai didi

java - Kafka - 如何在 Producer 类中获取失败的消息详细信息

转载 作者:塔克拉玛干 更新时间:2023-11-02 19:49:48 24 4
gpt4 key购买 nike

Kafka 允许通过 Producer (KafkaProducer) 类的以下方法发送异步消息:

public java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record) 
public java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback)

成功可以通过
来处理1) Future<RecordMetaData>对象或
2) onCompletion回调调用的方法。 onCompletion的完整方法签名和使用如下(taken from kafka docs)

`

ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);  
producer.send(record,
new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null)
e.printStackTrace();
System.out.println("The offset of the record we just sent is: " + metadata.offset());
}
});

虽然失败需要通过Exception e处理传递给了 onCompletion方法

很好,到目前为止一切看起来都很好。

但如果我没弄错的话,任何可以从exception 获得的合理信息或 e对象是堆栈跟踪和异常消息。我想在这里指出的是,e不包含实际发送记录的任何信息。或者换句话说,它不包含对实际 record 的引用。已发送给卡夫卡经纪人。那么,如果record,生产者可以进行哪些有用的加工或处理?没有发送成功。真的不多。为什么我这么说 - 理想情况下,我想在某个地方记录失败的消息,然后尝试重新发送它。但是框架提供的信息很少(e),我觉得这是不可能的。

谁能指出我是对还是错?

最佳答案

您可以轻松地创建一个回调,将 producerRecord 作为构造函数参数接收。因此,在出现异常的 onCompletion 时,您可以完全了解生产者记录,甚至可以尝试再次发送它。

我遇到过同样的问题。创建了一个同时获取 producerRecord 的回调,以及一个使用执行程序服务再次发送记录的回调处理程序。所以最终,我可以容忍任何数量的故障(例如网络问题或 kafka 宕机),并从中恢复。

关于java - Kafka - 如何在 Producer 类中获取失败的消息详细信息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45528743/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com