gpt4 book ai didi

scala - Spark Dataframe 以 avro 格式写入 kafka 主题?

转载 作者:行者123 更新时间:2023-12-03 20:18:50 34 4
gpt4 key购买 nike

我在 Spark 中有一个数据框,看起来像

eventDF


   Sno|UserID|TypeExp
1|JAS123|MOVIE
2|ASP123|GAMES
3|JAS123|CLOTHING
4|DPS123|MOVIE
5|DPS123|CLOTHING
6|ASP123|MEDICAL
7|JAS123|OTH
8|POQ133|MEDICAL
.......
10000|DPS123|OTH

我需要以 Avro 格式将其写入 Kafka 主题
目前我可以使用以下代码在 Kafka 中写入 JSON
val kafkaUserDF: DataFrame = eventDF.select(to_json(struct(eventDF.columns.map(column):_*)).alias("value"))
kafkaUserDF.selectExpr("CAST(value AS STRING)").write.format("kafka")
.option("kafka.bootstrap.servers", "Host:port")
.option("topic", "eventdf")
.save()

现在我想以 Avro 格式将其写入 Kafka 主题

最佳答案

Spark >= 2.4 :

您可以使用 to_avro 函数来自 spark-avro 图书馆。

import org.apache.spark.sql.avro._

eventDF.select(
to_avro(struct(eventDF.columns.map(column):_*)).alias("value")
)

Spark < 2.4

你必须以同样的方式做到这一点:
  • 创建一个函数,将序列化的 Avro 记录写入 ByteArrayOutputStream并返回结果。一个简单的实现(这仅支持平面对象)可能类似于(从 Kafka Avro Scala Example Sushil Kumar Singh 采纳)
    import org.apache.spark.sql.Row

    def encode(schema: org.apache.avro.Schema)(row: Row): Array[Byte] = {
    val gr: GenericRecord = new GenericData.Record(schema)
    row.schema.fieldNames.foreach(name => gr.put(name, row.getAs(name)))

    val writer = new SpecificDatumWriter[GenericRecord](schema)
    val out = new ByteArrayOutputStream()
    val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
    writer.write(gr, encoder)
    encoder.flush()
    out.close()

    out.toByteArray()
    }
  • 将其转换为 udf :
    import org.apache.spark.sql.functions.udf

    val schema: org.apache.avro.Schema
    val encodeUDF = udf(encode(schema) _)
  • 将其用作 to_json 的替代品
    eventDF.select(
    encodeUDF(struct(eventDF.columns.map(column):_*)).alias("value")
    )
  • 关于scala - Spark Dataframe 以 avro 格式写入 kafka 主题?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47951668/

    34 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com