gpt4 book ai didi

scala - 如何迭代 Spark DataFrame 行?

转载 作者:行者123 更新时间:2023-12-04 16:07:27 24 4
gpt4 key购买 nike

我需要遍历 DataFrame 行。

我不想每次都将它转换成 RDD 并过滤所需的行,例如:

var index = 0
def next = {
val result = df.rdd.filter(_._2 == index).collect.map(_._1).headOption
index += 1
result
}

有一个选项可以调用“collect”方法,它将返回 Array[Row],并对其进行迭代,但我相信当有大量数据时它不会成​​立。

val rowsIterator:Iterator[Row] = df.collect().iterator
rowsIterator.next

更新:我被要求提供更多信息:我希望将每一行写入我的数据库(在我的例子中是 ES),但为了使系统更稳定,我不使用背压来完成。

最佳答案

好吧,你可以这样做:

val df = ss.range(10000).toDF("i")

val dfEnumerated = df
.withColumn("row_number", row_number().over(Window.orderBy(lit(1))))
.cache()

val collectRnb = (rnb:Int) => dfEnumerated.where($"rnb"===rnb).drop($"rnb").collect.headOption.map(_.getLong(0))
val records : Iterator[Option[Long]] = Iterator.from(1).map(collectRnb)

def next = records.next

但在处理非常大的数据集时,这也会成为问题,因为我使用了一个窗口函数 (row_number) 而没有指定分区,因此这不能很好地扩展。

您还可以结合使用不同的方法,例如一次收集 1 个分区并遍历此数组。

编辑:

从 Spark 2.0 开始,您可以使用 .toLocalIterator() 来按分区收集数据:

Return an iterator that contains all of Rows in this Dataset. The iterator will consume as much memory as the largest partition in this Dataset

关于scala - 如何迭代 Spark DataFrame 行?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48149310/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com