gpt4 book ai didi

dataframe - Spark : Iterating through columns in each row to create a new dataframe

转载 作者:行者123 更新时间:2023-12-02 00:51:14 25 4
gpt4 key购买 nike

假设我有一个这样的数据框:

+-----------+-----------+-----------+-----------+------------+--+
| ColA | ColB | ColC | ColD | ColE | |
+-----------+-----------+-----------+-----------+------------+--+
| '' | sample_1x | sample_1y | '' | sample_1z | |
| sample2_x | sample2_y | '' | '' | '' | |
| sample3_x | '' | '' | '' | sample3_y | |
| sample4_x | sample4_y | '' | sample4_z | sample4_zz | |
| sample5_x | '' | '' | '' | '' | |
+-----------+-----------+-----------+-----------+------------+--+

我想创建另一个数据框,在每行中显示从左到右的关系,同时跳过具有空值的列。也将排除只有 1 个有效列式记录的行。例如:

+-----------+------------+-----------+
| From | To | Label |
+-----------+------------+-----------+
| sample1_x | sample1_y | ColB_ColC |
| sample1_y | sample1_z | ColC_ColE |
| sample2_x | sample2_y | ColA_ColB |
| sample3_x | sample3_y | ColA_ColE |
| sample4_x | sample4_y | ColA_ColB |
| sample4_y | sample4_z | ColB_ColD |
| sample4_z | sample4_zz | ColD_ColE |
+-----------+------------+-----------+

我认为方法是编写一个包含此逻辑的 UDF,但我不完全确定如何返回一个全新的 DF,因为我已经习惯了 UDF,只是在同一个 DF 中创建另一个列.或者是否有另一个 spark 函数可以比创建 UDF 更容易地处理这种情况?如果重要,请使用 pyspark。

最佳答案

您可以使用 udf,它接受一个数组参数并返回一个结构数组,例如:

from pyspark.sql import functions as F

df.show()
+---------+---------+---------+---------+----------+
| ColA| ColB| ColC| ColD| ColE|
+---------+---------+---------+---------+----------+
| null|sample_1x|sample_1y| null| sample_1z|
|sample2_x|sample2_y| null| null| null|
|sample3_x| null| null| null| sample3_y|
|sample4_x|sample4_y| null|sample4_z|sample4_zz|
|sample5_x| null| null| null| null|
+---------+---------+---------+---------+----------+

# columns that get involved, will group them into an array using F.array(cols)
cols = df.columns

# defind function to convert array into array of structs
def find_route(arr, cols):
d = [ (cols[i],e) for i,e in enumerate(arr) if e is not None ]
return [ {'From':d[i][1], 'To':d[i+1][1], 'Label':d[i][0]+'_'+d[i+1][0]} for i in range(len(d)-1) ]

# set up the UDF and add cols as an extra argument
udf_find_route = F.udf(lambda a: find_route(a, cols), 'array<struct<From:string,To:string,Label:string>>')

# retrive the data from the array of structs after array-explode
df.select(F.explode(udf_find_route(F.array(cols))).alias('c1')).select('c1.*').show()
+---------+----------+---------+
| From| To| Label|
+---------+----------+---------+
|sample_1x| sample_1y|ColB_ColC|
|sample_1y| sample_1z|ColC_ColE|
|sample2_x| sample2_y|ColA_ColB|
|sample3_x| sample3_y|ColA_ColE|
|sample4_x| sample4_y|ColA_ColB|
|sample4_y| sample4_z|ColB_ColD|
|sample4_z|sample4_zz|ColD_ColE|
+---------+----------+---------+

关于dataframe - Spark : Iterating through columns in each row to create a new dataframe,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57420752/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com