gpt4 book ai didi

java - 在 producer.send 期间获取 ProducerFencedException 的原因是什么?

转载 作者:行者123 更新时间:2023-12-01 14:14:25 32 4
gpt4 key购买 nike

尝试将大约 50K 消息加载到 KAFKA 主题中。在少数运行开始时低于异常但并非一直如此。

org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state  
at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784) ~[kafka-clients-2.0.0.jar:?]
at org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:229) ~[kafka-clients-2.0.0.jar:?]
at org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:679) ~[kafka-clients-2.0.0.jar:?]
at myPackage.persistUpdatesPostAction(MyCode.java:??) ~[aKafka.jar:?]
...
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer
attempted an operation with an old epoch. Either there is a newer producer with
the same transactionalId, or the producer's transaction has been expired by the
broker.

代码块如下:
public void persistUpdatesPostAction(List<Message> messageList ) {
if ((messageList == null) || (messageList.isEmpty())) {
return;
}
logger.createDebug("Messages in batch(postAction) : "+ messageList.size());
Producer<String,String> producer = KafkaUtils.getProducer(Thread.currentThread().getName());
try {
producer.beginTransaction();
createKafkaBulkInsert1(producer, messageList, "Topic1");
createKafkaBulkInsert2(producer, messageList, "Topic2");
createKafkaBulkInsert3(producer, messageList, "Topic3");
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
producer.close();
KafkaUtils.removeProducer(Thread.currentThread().getName());
}
}

-----------

static Properties setPropertiesProducer() {
Properties temp = new Properties();
temp.put("bootstrap.servers", "localhost:9092");
temp.put("acks", "all");
temp.put("retries", 1);
temp.put("batch.size", 16384);
temp.put("linger.ms", 5);
temp.put("buffer.memory", 33554432);
temp.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
temp.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return temp;
}

public static Producer<String, String> getProducer(String aThreadId) {
if ((producerMap.size() == 0) || (producerMap.get(aThreadId) == null)) {
Properties temp = producerProps;
temp.put("transactional.id", aThreadId);
Producer<String, String> producer = new KafkaProducer<String, String>(temp);
producerMap.put(aThreadId, producer);
producer.initTransactions();
return producer;
}
return producerMap.get(aThreadId);
}

public static void removeProducer(String aThreadId) {
logger.createDebug("Removing Thread ID :" + aThreadId);
if (producerMap.get(aThreadId) == null)
return;
producerMap.remove(aThreadId);
}

最佳答案

Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producerattempted an operation with an old epoch. Either there is a newer producer withthe same transactionalId, or the producer's transaction has been expired by thebroker.


此异常消息不是很有帮助。我相信它试图说经纪人不再有客户发送的交易ID的任何记录。这可能是因为:
  • 其他人正在使用相同的事务 ID 并且已经提交了它。根据我的经验,除非您在客户端之间共享事务 ID,否则这种情况不太可能发生。我们使用 UUID.randomUUID() 确保我们的 ID 是唯一的.
  • 交易超时并被经纪人自动化删除。

  • 在我们的例子中,我们经常遇到事务超时,从而产生了这个异常。有 2 个属性控制代理在中止和忘记交易之前记住交易的时间。
  • transaction.max.timeout.ms -- 一个代理属性,它指定在事务被中止和遗忘之前的最大毫秒数。许多 Kafka 版本的默认值似乎是 900000(15 分钟)。 Documentation from Kafka说:

    The maximum allowed timeout for transactions. If a client’s requested transaction time exceeds this, then the broker will return an error in InitProducerIdRequest. This prevents a client from too large of a timeout, which can stall consumers reading from topics included in the transaction.


  • transaction.timeout.ms -- 一个生产者客户端属性,在创建事务时以毫秒为单位设置超时。许多 Kafka 版本的默认值似乎是 60000(1 分钟)。来自 Kafka 的文档说:

    The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction.



  • 如果 transaction.timeout.ms客户端中设置的属性超过 transaction.max.timeout.ms属性,生产者将立即抛出类似以下异常:
    org.apache.kafka.common.KafkaException: Unexpected error in
    InitProducerIdResponse The transaction timeout is larger than the maximum value
    allowed by the broker (as configured by transaction.max.timeout.ms).

    关于java - 在 producer.send 期间获取 ProducerFencedException 的原因是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53058715/

    32 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com