gpt4 book ai didi

scala - 从Kafka读取json并将json写入其他Kafka主题

转载 作者:行者123 更新时间:2023-12-02 20:31:50 27 4
gpt4 key购买 nike

我正在尝试为 Spark 流准备应用程序(Spark 2.1、Kafka 0.10)

我需要从Kafka主题“输入”读取数据,找到正确的数据并将结果写入主题“输出”

我可以基于KafkaUtils.createDirectStream方法从Kafka读取数据。

我将 RDD 转换为 json 并准备过滤器:

val messages = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)

val elementDstream = messages.map(v => v.value).foreachRDD { rdd =>

val PeopleDf=spark.read.schema(schema1).json(rdd)
import spark.implicits._
PeopleDf.show()
val PeopleDfFilter = PeopleDf.filter(($"value1".rlike("1"))||($"value2" === 2))
PeopleDfFilter.show()
}

我可以使用 KafkaProducer 从 Kafka 加载数据并“按原样”写入 Kafka:

    messages.foreachRDD( rdd => {
rdd.foreachPartition( partition => {
val kafkaTopic = "output"
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")

val producer = new KafkaProducer[String, String](props)
partition.foreach{ record: ConsumerRecord[String, String] => {
System.out.print("########################" + record.value())
val messageResult = new ProducerRecord[String, String](kafkaTopic, record.value())
producer.send(messageResult)
}}
producer.close()
})

})

但是,我无法集成这两个操作 > 在 json 中查找正确的值并将结果写入 Kafka:以 JSON 格式写入 PeopleDfFilter 来“输出”Kafka 主题。

我在 Kafka 中有很多输入消息,这就是我想使用 foreachPartition 创建 Kafka 生产者的原因。

最佳答案

这个过程非常简单,为什么不完全使用结构化流呢?

import org.apache.spark.sql.functions.from_json

spark
// Read the data
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", inservers)
.option("subscribe", intopic)
.load()
// Transform / filter
.select(from_json($"value".cast("string"), schema).alias("value"))
.filter(...) // Add the condition
.select(to_json($"value").alias("value")
// Write back
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", outservers)
.option("subscribe", outtopic)
.start()

关于scala - 从Kafka读取json并将json写入其他Kafka主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47457755/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com