gpt4 book ai didi

scala - 将 Spark-kafka InputDStream 转换为 Array[Bytes]

转载 作者:行者123 更新时间:2023-12-04 03:14:34 26 4
gpt4 key购买 nike

我正在使用 scala 并使用以下 Spark Streaming 方法从 Kafka 消费数据:

val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)

上面的变量返回 InputDStream,通过它我可以使用下面的代码查看原始/二进制格式的数据:println(行)

但我需要在原始/二进制格式上应用 avro 格式(可用模式),以便以预期的 json 格式查看数据。为了应用 avro 格式,我需要将上面的 InputDStream 转换为 avro 使用的 Array[Bytes]。

有人可以告诉我将 InputDStream 转换为 Array[Bytes] 吗?

或者

如果您知道在 InputDStream(of spark Streaming)上应用 avro 模式的更好方法,请分享。

最佳答案

您需要做两件事。第一种是为 Kafka 使用 DefaultDecoder,它为您提供一个 Array[Byte] 作为值类型:

val lines: DStream[(String, Array[Byte])] = 
KafkaUtils
.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, topics)

然后您需要通过额外的 map 应用您的 Avro 反序列化逻辑:

lines.map { case (_, bytes) => avroDeserializer.deserialize(bytes) }

avroDeserializer 是您的任意类,它知道如何从 Avro 字节创建您的类型。

我个人使用avro4s通过宏获取案例类反序列化。

关于scala - 将 Spark-kafka InputDStream 转换为 Array[Bytes],我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42213761/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com