gpt4 book ai didi

python - PySpark:当列是列表时向 DataFrame 添加一列

转载 作者:行者123 更新时间:2023-12-04 15:55:22 25 4
gpt4 key购买 nike

我读过类似的问题,但找不到解决我的具体问题的方法。

我有一个 list

l = [1, 2, 3]

和一个数据帧
df = sc.parallelize([
['p1', 'a'],
['p2', 'b'],
['p3', 'c'],
]).toDF(('product', 'name'))

我想获得一个新的 DataFrame,其中列表 l添加为另一列,即
+-------+----+---------+
|product|name| new_col |
+-------+----+---------+
| p1| a| 1 |
| p2| b| 2 |
| p3| c| 3 |
+-------+----+---------+

使用 JOIN 的方法,我在那里加入了 df
 sc.parallelize([[1], [2], [3]])

失败了。使用 withColumn 的方法,如
new_df = df.withColumn('new_col', l)

已失败,因为列表不是 Column目的。

最佳答案

所以,从阅读一些有趣的东西 here ,我已经确定你不能真的只是将一个随机/任意列附加到给定的 DataFrame目的。看来您想要的更多是 zipjoin .我环顾四周,发现this ticket ,这让我觉得你不能zip鉴于您有 DataFrame而不是 RDD对象。

我能够解决您的问题的唯一方法是离开 DataFrame 的世界对象并返回 RDD对象。我还需要为连接创建一个索引,这可能适用于您的用例,也可能不适用于您的用例。

l = sc.parallelize([1, 2, 3])
index = sc.parallelize(range(0, l.count()))
z = index.zip(l)

rdd = sc.parallelize([['p1', 'a'], ['p2', 'b'], ['p3', 'c']])
rdd_index = index.zip(rdd)

# just in case!
assert(rdd.count() == l.count())
# perform an inner join on the index we generated above, then map it to look pretty.
new_rdd = rdd_index.join(z).map(lambda (x, y): [y[0][0], y[0][1], y[1]])
new_df = new_rdd.toDF(["product", 'name', 'new_col'])

当我跑 new_df.show() ,我得到:
+-------+----+-------+
|product|name|new_col|
+-------+----+-------+
| p1| a| 1|
| p2| b| 2|
| p3| c| 3|
+-------+----+-------+

旁注:我真的很惊讶这不起作用。看起来像外连接?
from pyspark.sql import Row
l = sc.parallelize([1, 2, 3])
new_row = Row("new_col_name")
l_as_df = l.map(new_row).toDF()
new_df = df.join(l_as_df)

当我跑 new_df.show() ,我得到:
+-------+----+------------+
|product|name|new_col_name|
+-------+----+------------+
| p1| a| 1|
| p1| a| 2|
| p1| a| 3|
| p2| b| 1|
| p3| c| 1|
| p2| b| 2|
| p2| b| 3|
| p3| c| 2|
| p3| c| 3|
+-------+----+------------+

关于python - PySpark:当列是列表时向 DataFrame 添加一列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36132899/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com