gpt4 book ai didi

apache-spark - Spark : Merge 2 dataframes by adding row index/number on both dataframes

转载 作者:行者123 更新时间:2023-12-04 00:10:58 27 4
gpt4 key购买 nike

问:有什么办法可以在 PySpark 中合并两个数据帧或将数据帧的一列复制到另一列?

例如,我有两个数据框:

DF1              
C1 C2
23397414 20875.7353
5213970 20497.5582
41323308 20935.7956
123276113 18884.0477
76456078 18389.9269

第二个数据框
DF2
C3 C4
2008-02-04 262.00
2008-02-05 257.25
2008-02-06 262.75
2008-02-07 237.00
2008-02-08 231.00

然后我想像这样将 DF2 的 C3 添加到 DF1:
New DF              
C1 C2 C3
23397414 20875.7353 2008-02-04
5213970 20497.5582 2008-02-05
41323308 20935.7956 2008-02-06
123276113 18884.0477 2008-02-07
76456078 18389.9269 2008-02-08

我希望这个例子很清楚。

最佳答案

rownum + window function 即解决方案 1 或 zipWithIndex.map 即解决方案 2 在这种情况下应该有所帮助。

解决方案1:您可以使用窗口函数来获得这个kind of

然后我建议你将 rownumber 作为附加列名添加到 Dataframe 说 df1。

  DF1              
C1 C2 columnindex
23397414 20875.7353 1
5213970 20497.5582 2
41323308 20935.7956 3
123276113 18884.0477 4
76456078 18389.9269 5

第二个数据框
DF2
C3 C4 columnindex
2008-02-04 262.00 1
2008-02-05 257.25 2
2008-02-06 262.75 3
2008-02-07 237.00 4
2008-02-08 231.00 5

现在..做 df1 和 df2 的内部连接,就这样...
你会得到低于输出


像这样的东西
from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber

w = Window().orderBy()

df1 = .... // as showed above df1

df2 = .... // as shown above df2


df11 = df1.withColumn("columnindex", rowNumber().over(w))
df22 = df2.withColumn("columnindex", rowNumber().over(w))

newDF = df11.join(df22, df11.columnindex == df22.columnindex, 'inner').drop(df22.columnindex)
newDF.show()



New DF
C1 C2 C3
23397414 20875.7353 2008-02-04
5213970 20497.5582 2008-02-05
41323308 20935.7956 2008-02-06
123276113 18884.0477 2008-02-07
76456078 18389.9269 2008-02-08

解决方案 2:Scala 中的另一种好方法(可能这是最好的 :)),您可以将其转换为 pyspark:
/**
* Add Column Index to dataframe
*/
def addColumnIndex(df: DataFrame) = sqlContext.createDataFrame(
// Add Column index
df.rdd.zipWithIndex.map{case (row, columnindex) => Row.fromSeq(row.toSeq :+ columnindex)},
// Create schema
StructType(df.schema.fields :+ StructField("columnindex", LongType, false))
)

// Add index now...
val df1WithIndex = addColumnIndex(df1)
val df2WithIndex = addColumnIndex(df2)

// Now time to join ...
val newone = df1WithIndex
.join(df2WithIndex , Seq("columnindex"))
.drop("columnindex")

关于apache-spark - Spark : Merge 2 dataframes by adding row index/number on both dataframes,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40508489/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com