gpt4 book ai didi

scala - Reactive-Kafka Stream Consumer : Dead letters occured

转载 作者:行者123 更新时间:2023-12-02 01:17:48 27 4
gpt4 key购买 nike

我正在尝试使用 akka 的响应式(Reactive) kafka 库来使用来自 Kafka 的消息。我正在打印一条消息,然后我得到了

[INFO] [01/24/2017 10:36:52.934] [CommittableSourceConsumerMain-akka.actor.default-dispatcher-5] [akka://CommittableSourceConsumerMain/system/kafka-consumer-1] Message [akka.kafka.KafkaConsumerActor$Internal$Stop$] from Actor[akka://CommittableSourceConsumerMain/deadLetters] to Actor[akka://CommittableSourceConsumerMain/system/kafka-consumer-1#-1726905274] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

这是我正在执行的代码

import akka.actor.ActorSystem
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import org.apache.kafka.clients.consumer.ConsumerConfig
import play.api.libs.json._
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}

object CommittableSourceConsumerMain extends App {

implicit val system = ActorSystem("CommittableSourceConsumerMain")
implicit val materializer = ActorMaterializer()
val consumerSettings =ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer).withBootstrapServers("localhost:9092").withGroupId("CommittableSourceConsumer").withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

val done =
Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
.mapAsync(1) { msg =>
val record=(msg.record.value())
val data=Json.parse(record)

val recordType=data \ "data" \"event" \"type"

val actualData=data \ "data" \ "row"

if(recordType.as[String]=="created"){
"Some saving logic"
}

else{

"Some logic"

}
msg.committableOffset.commitScaladsl()
}
.runWith(Sink.ignore)
}

最佳答案

我终于想出了解决办法。由于流中的运行时异常,返回失败的 Future 立即终止流。Akka-stream 不提供或显示运行时异常。从而知道异常

done.onFailure{
case NonFatal(e)=>println(e)
}

异常出现在 if-else block 中。如果发生异常,也可以使用 Actor Strategy 恢复流。

关于scala - Reactive-Kafka Stream Consumer : Dead letters occured,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41825020/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com