gpt4 book ai didi

apache-spark - 结构化流 OOM

转载 作者:行者123 更新时间:2023-12-04 04:09:44 25 4
gpt4 key购买 nike

我在 k8s 运算符上部署了一个结构化流作业,它只是从 kafka 读取数据,反序列化,添加 2 列并将结果存储在数据湖中(尝试了 delta 和 parquet),几天后执行程序增加了内存,最终我得到OOM。输入记录的 kbs 真的很低。P.s 我使用完全相同的代码,但使用 cassandra 作为接收器,现在运行了将近一个月,没有任何问题。有什么想法吗?

enter image description here

enter image description here

我的代码

spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", MetisStreamsConfig.bootstrapServers)
.option("subscribe", MetisStreamsConfig.topics.head)
.option("startingOffsets", startingOffsets)
.option("maxOffsetsPerTrigger", MetisStreamsConfig.maxOffsetsPerTrigger)
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
.withColumn("payload", from_json($"value", schema))

// selection + filtering
.select("payload.*")
.select($"vesselQuantity.qid" as "qid", $"vesselQuantity.vesselId" as "vessel_id", explode($"measurements"))
.select($"qid", $"vessel_id", $"col.*")
.filter($"timestamp".isNotNull)
.filter($"qid".isNotNull and !($"qid"===""))
.withColumn("ingestion_time", current_timestamp())
.withColumn("mapping", MappingUDF($"qid"))
writeStream
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
log.info(s"Storing batch with id: `$batchId`")
val calendarInstance = Calendar.getInstance()

val year = calendarInstance.get(Calendar.YEAR)
val month = calendarInstance.get(Calendar.MONTH) + 1
val day = calendarInstance.get(Calendar.DAY_OF_MONTH)
batchDF.write
.mode("append")
.parquet(streamOutputDir + s"/$year/$month/$day")
}
.option("checkpointLocation", checkpointDir)
.start()

我更改为 foreachBatch,因为将 delta 或 parquet 与 partitionBy 一起使用会导致问题更快

最佳答案

Spark 3.1.0 中解决了一个错误。

参见 https://github.com/apache/spark/pull/28904

解决问题的其他方法和调试功劳:

https://www.waitingforcode.com/apache-spark-structured-streaming/file-sink-out-of-memory-risk/read

即使您正在使用 foreachBatch,您也可能会发现这很有用......

关于apache-spark - 结构化流 OOM,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61951953/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com