gpt4 book ai didi

scala - 迭代 Spark 数据框中的行和列

转载 作者:行者123 更新时间:2023-12-03 03:38:12 24 4
gpt4 key购买 nike

我有以下动态创建的 Spark 数据框:

val sf1 = StructField("name", StringType, nullable = true)
val sf2 = StructField("sector", StringType, nullable = true)
val sf3 = StructField("age", IntegerType, nullable = true)

val fields = List(sf1,sf2,sf3)
val schema = StructType(fields)

val row1 = Row("Andy","aaa",20)
val row2 = Row("Berta","bbb",30)
val row3 = Row("Joe","ccc",40)

val data = Seq(row1,row2,row3)

val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)

df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")

现在,我需要迭代 sqlDF 中的每一行和每一列来打印每一列,这是我的尝试:

sqlDF.foreach { row =>
row.foreach { col => println(col) }
}

rowRow 类型,但不可迭代,这就是此代码在 row.foreach 中引发编译错误的原因。如何迭代Row中的每一列?

最佳答案

假设您有一个Dataframe像下面这样

+-----+------+---+
| name|sector|age|
+-----+------+---+
| Andy| aaa| 20|
|Berta| bbb| 30|
| Joe| ccc| 40|
+-----+------+---+

要循环 Dataframe 并从 Dataframe 中提取元素,您可以选择以下方法之一。

方法 1 - 使用 foreach 循环

直接使用 foreach 循环数据帧循环是不可能的。为此,首先您必须使用 case class 定义数据帧的架构。然后您必须将此架构指定给数据框。

import spark.implicits._
import org.apache.spark.sql._
case class cls_Employee(name:String, sector:String, age:Int)
val df = Seq(cls_Employee("Andy","aaa", 20), cls_Employee("Berta","bbb", 30), cls_Employee("Joe","ccc", 40)).toDF()
df.as[cls_Employee].take(df.count.toInt).foreach(t => println(s"name=${t.name},sector=${t.sector},age=${t.age}"))

请查看下面的结果:

enter image description here

方法 2 - 使用 rdd 循环

使用rdd.collect在您的数据框之上。 row变量将包含rddDataframe的每一行行类型。要获取行中的每个元素,请使用 row.mkString(",")它将包含以逗号分隔的每行的值。使用split函数(内置函数)您可以访问 rdd 的每一列值带索引的行。

for (row <- df.rdd.collect)
{
var name = row.mkString(",").split(",")(0)
var sector = row.mkString(",").split(",")(1)
var age = row.mkString(",").split(",")(2)
}

请注意,这种方法有两个缺点。
1、如果有,在列值中,数据将被错误地分割到相邻列。
2.rdd.collectaction它将所有数据返回到驱动程序的内存,而驱动程序的内存可能没有那么大来容纳数据,最终导致应用程序失败。

我建议使用方法 1

方法 3 - 使用 where 并选择

您可以直接使用whereselect它将在内部循环并查找数据。由于它不应该抛出 Index 越界异常,因此使用 if 条件

if(df.where($"name" === "Andy").select(col("name")).collect().length >= 1)
name = df.where($"name" === "Andy").select(col("name")).collect()(0).get(0).toString

方法 4 - 使用临时表

您可以将数据帧注册为temptable,它将存储在spark的内存中。然后您可以像其他数据库一样使用选择查询来查询数据,然后收集并保存在变量中

df.registerTempTable("student")
name = sqlContext.sql("select name from student where name='Andy'").collect()(0).toString().replace("[","").replace("]","")

关于scala - 迭代 Spark 数据框中的行和列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49252670/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com