gpt4 book ai didi

scala - 如何将 Spark 流 DF 写入 Kafka 主题

转载 作者:行者123 更新时间:2023-12-03 08:34:18 28 4
gpt4 key购买 nike

我正在使用 Spark Streaming 来处理两个 Kafka 队列之间的数据,但我似乎找不到从 Spark 写入 Kafka 的好方法。我试过这个:

input.foreachRDD(rdd =>
rdd.foreachPartition(partition =>
partition.foreach {
case x: String => {
val props = new HashMap[String, Object]()

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")

println(x)
val producer = new KafkaProducer[String, String](props)
val message = new ProducerRecord[String, String]("output", null, x)
producer.send(message)
}
}
)
)

它按预期工作,但在真实环境中为每条消息实例化一个新的 KafkaProducer 显然是不可行的,我正在尝试解决它。

我想为每个进程保留对单个实例的引用,并在需要发送消息时访问它。如何从 Spark Streaming 写入 Kafka?

最佳答案

我的第一个建议是尝试在 foreachPartition 中创建一个新实例并测量它是否足够快满足您的需要(在 foreachPartition 中实例化重对象是官方文档建议的)。

另一种选择是使用对象池,如下例所示:

https://github.com/miguno/kafka-storm-starter/blob/develop/src/main/scala/com/miguno/kafkastorm/kafka/PooledKafkaProducerAppFactory.scala

然而,我发现在使用检查点时很难实现。

另一个对我来说效果很好的版本是一个工厂,如以下博客文章中所述,您只需要检查它是否提供了足够的并行性来满足您的需求(查看评论部分):

http://allegro.tech/2015/08/spark-kafka-integration.html

关于scala - 如何将 Spark 流 DF 写入 Kafka 主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31590592/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com