gpt4 book ai didi

java - 卡夫卡生产者: how to handle "java.net.ConnectException: Connection refused"

转载 作者:太空宇宙 更新时间:2023-11-04 12:01:25 32 4
gpt4 key购买 nike

我使用 Kafka 0.10.1.0。

这是我的制作人:

val props: Properties = ...
val producer = new KafkaProducer[String, AnyRef](props)
val callback = new Callback {
override def onCompletion(md: RecordMetadata, e: Exception): Unit = ...
}
producer.send(new ProducerRecord[String, AnyRef]("topic", "hello"), callback)

但是上面的回调无法处理java.net.ConnectException: Connection refused当 kafka 服务器宕机时。

UPD

ConnectionException在另一个线程中引发(进入 Sender 类,该类用于 KafkaProducer )。因此我们不能使用try {} catch为了它。另外,我不需要重试机制,我需要一种方法来处理这种情况(例如,如果 Kafka 宕机并且生产者无法发送消息,那么我将使用其他一些队列 API)。

有没有办法处理这个异常?

最佳答案

您有几个选择。 Scala 提供了一种捕获异常的方法,其形式如下:

   try { 
// ...
}
catch {
case ioe: IOException => ... // more specific cases first !
case e: Exception => ...
}

所以最简单的方法是:

  try { 
producer.send(new ProducerRecord[String, AnyRef]("topic", "hello"), callback)
}
catch {
case ce: ConnectionException => // handle exception
}

更复杂但更强大的是重试机制:

What's the Scala way to implement a retry-able call like this one?

另请注意,Kafka Producer 内置了重试机制,这也可能会有所帮助:

Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting max.in.flight.requests.per.connection to 1 will potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may appear first.

关于java - 卡夫卡生产者: how to handle "java.net.ConnectException: Connection refused",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40866634/

32 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com