gpt4 book ai didi

scala - 在键上加入 Spark 数据帧

转载 作者:行者123 更新时间:2023-12-03 04:44:42 25 4
gpt4 key购买 nike

我构建了两个数据框。我们如何连接多个 Spark 数据帧?

例如:

PersonDfProfileDf,其中公共(public)列为 personId 作为(键)。 现在我们如何才能拥有一个将 PersonDfProfileDf 结合在一起的 Dataframe?

最佳答案

使用 scala 的别名方法 (this is example given for older version of spark for spark 2.x see my other answer):

您可以使用案例类来准备示例数据集...对于例如,这是可选的:您也可以从 hiveContext.sql 获取 DataFrame

import org.apache.spark.sql.functions.col

case class Person(name: String, age: Int, personid : Int)

case class Profile(name: String, personid : Int , profileDescription: String)

val df1 = sqlContext.createDataFrame(
Person("Bindu",20, 2)
:: Person("Raphel",25, 5)
:: Person("Ram",40, 9):: Nil)


val df2 = sqlContext.createDataFrame(
Profile("Spark",2, "SparkSQLMaster")
:: Profile("Spark",5, "SparkGuru")
:: Profile("Spark",9, "DevHunter"):: Nil
)

// you can do alias to refer column name with aliases to increase readablity

val df_asPerson = df1.as("dfperson")
val df_asProfile = df2.as("dfprofile")


val joined_df = df_asPerson.join(
df_asProfile
, col("dfperson.personid") === col("dfprofile.personid")
, "inner")


joined_df.select(
col("dfperson.name")
, col("dfperson.age")
, col("dfprofile.name")
, col("dfprofile.profileDescription"))
.show

我个人不喜欢的示例临时表方法...

The reason to use the registerTempTable( tableName ) method for a DataFrame, is so that in addition to being able to use the Spark-provided methods of a DataFrame, you can also issue SQL queries via the sqlContext.sql( sqlQuery ) method, that use that DataFrame as an SQL table. The tableName parameter specifies the table name to use for that DataFrame in the SQL queries.

df_asPerson.registerTempTable("dfperson");
df_asProfile.registerTempTable("dfprofile")

sqlContext.sql("""SELECT dfperson.name, dfperson.age, dfprofile.profileDescription
FROM dfperson JOIN dfprofile
ON dfperson.personid == dfprofile.personid""")

如果您想了解有关联接的更多信息,请参阅这篇好文章:beyond-traditional-join-with-apache-spark

enter image description here

Note : 1) As mentioned by @RaphaelRoth ,

val resultDf = PersonDf.join(ProfileDf,Seq("personId")) is good approach since it doesnt have duplicate columns from both sides if you are using inner join with same table.
2) Spark 2.x example updated in another answer with full set of join operations supported by spark 2.x with examples + result

提示:

此外,连接中的重要事项:broadcast function can help to give hint please see my answer

关于scala - 在键上加入 Spark 数据帧,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40343625/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com