gpt4 book ai didi

apache-spark - Pyspark 根据列值复制行

转载 作者:行者123 更新时间:2023-12-05 04:05:27 25 4
gpt4 key购买 nike

我想根据每行给定列的值复制我的 DataFrame 中的所有行,然后为每个新行编制索引。假设我有:

Column A Column B
T1 3
T2 2

我想要的结果是:

Column A Column B Index
T1 3 1
T1 3 2
T1 3 3
T2 2 1
T2 2 2

我能够使用固定值进行类似操作,但不能使用列中的信息。我当前的固定值工作代码是:

idx = [lit(i) for i in range(1, 10)]
df = df.withColumn('Index', explode(array( idx ) ))

我试图改变:

lit(i) for i in range(1, 10) 

lit(i) for i in range(1, df['Column B'])

并将其添加到我的 array() 函数中:

df = df.withColumn('Index', explode(array( lit(i) for i in range(1, df['Column B']) ) ))

但它不起作用(TypeError:'Column' 对象不能被解释为整数)。

我应该如何实现?

最佳答案

不幸的是你不能iterate over a Column像那样。您始终可以使用 udf,但我确实有一个非 udf hack 解决方案,如果您使用的是 Spark 2.1 或更高版本,它应该适合您。

诀窍是利用pyspark.sql.functions.posexplode()获取索引值。为此,我们通过重复逗号 Column B 次来创建字符串。然后我们用逗号分割这个字符串,并使用 posexplode 来获取索引。

df.createOrReplaceTempView("df")  # first register the DataFrame as a temp table

query = 'SELECT '\
'`Column A`,'\
'`Column B`,'\
'pos AS Index '\
'FROM ( '\
'SELECT DISTINCT '\
'`Column A`,'\
'`Column B`,'\
'posexplode(split(repeat(",", `Column B`), ",")) '\
'FROM df) AS a '\
'WHERE a.pos > 0'
newDF = sqlCtx.sql(query).sort("Column A", "Column B", "Index")
newDF.show()
#+--------+--------+-----+
#|Column A|Column B|Index|
#+--------+--------+-----+
#| T1| 3| 1|
#| T1| 3| 2|
#| T1| 3| 3|
#| T2| 2| 1|
#| T2| 2| 2|
#+--------+--------+-----+

注意:您需要将列名用反引号括起来,因为它们中有空格,如本文所述:How to express a column which name contains spaces in Spark SQL

关于apache-spark - Pyspark 根据列值复制行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51109018/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com