gpt4 book ai didi

scala - 使用Spark 2.0.2从Kafka读取Avro消息(结构化流)

转载 作者:行者123 更新时间:2023-12-04 06:55:30 25 4
gpt4 key购买 nike

我有一个spark 2.0应用程序,该应用程序使用Spark Streaming(带有spark-streaming-kafka-0-10_2.11)从kafka读取消息。

结构化流看起来真的很酷,所以我想尝试迁移代码,但是我不知道如何使用它。

在常规流中,我使用kafkaUtils来创建Dstrean,在传递的参数中,它是值反序列化器。

在结构化流中,文档说我应该使用DataFrame函数反序列化,但是我无法确切知道这意味着什么。

我查看了诸如example这样的示例,但是我在Kafka中的Avro对象非常复杂,不能像示例中的String那样简单地进行转换。

到目前为止,我尝试了这种代码(我在另一个问题中看到了这种代码):

import spark.implicits._

val ds1 = spark.readStream.format("kafka").
option("kafka.bootstrap.servers","localhost:9092").
option("subscribe","RED-test-tal4").load()

ds1.printSchema()
ds1.select("value").printSchema()
val ds2 = ds1.select($"value".cast(getDfSchemaFromAvroSchema(Obj.getClassSchema))).show()
val query = ds2.writeStream
.outputMode("append")
.format("console")
.start()

我得到“数据类型不匹配:无法将BinaryType强制转换为StructType(StructField(....“)。

我如何反序列化值(value)?

最佳答案

如上所述,自Spark 2.1.0起,批处理读取器支持avro,但SparkSession.readStream()不支持。这是我根据其他响应在Scala中工作的方式。为了简化起见,我简化了架构。

package com.sevone.sparkscala.mypackage

import org.apache.spark.sql._
import org.apache.avro.io.DecoderFactory
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}

object MyMain {

// Create avro schema and reader
case class KafkaMessage (
deviceId: Int,
deviceName: String
)
val schemaString = """{
"fields": [
{ "name": "deviceId", "type": "int"},
{ "name": "deviceName", "type": "string"},
],
"name": "kafkamsg",
"type": "record"
}""""
val messageSchema = new Schema.Parser().parse(schemaString)
val reader = new GenericDatumReader[GenericRecord](messageSchema)
// Factory to deserialize binary avro data
val avroDecoderFactory = DecoderFactory.get()
// Register implicit encoder for map operation
implicit val encoder: Encoder[GenericRecord] = org.apache.spark.sql.Encoders.kryo[GenericRecord]

def main(args: Array[String]) {

val KafkaBroker = args(0);
val InTopic = args(1);
val OutTopic = args(2);

// Get Spark session
val session = SparkSession
.builder
.master("local[*]")
.appName("myapp")
.getOrCreate()

// Load streaming data
import session.implicits._
val data = session
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", KafkaBroker)
.option("subscribe", InTopic)
.load()
.select($"value".as[Array[Byte]])
.map(d => {
val rec = reader.read(null, avroDecoderFactory.binaryDecoder(d, null))
val deviceId = rec.get("deviceId").asInstanceOf[Int]
val deviceName = rec.get("deviceName").asInstanceOf[org.apache.avro.util.Utf8].toString
new KafkaMessage(deviceId, deviceName)
})

关于scala - 使用Spark 2.0.2从Kafka读取Avro消息(结构化流),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40705926/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com