gpt4 book ai didi

python - 如何从 pyspark 中的另一列中查找一列的顶级层次结构?

转载 作者:行者123 更新时间:2023-12-01 07:20:55 26 4
gpt4 key购买 nike

我想查找组织中员工的顶层层次结构并使用 pyspark 分配报告级别?

我们已经使用 Spark GraphX 通过 Scala 支持解决了这个问题。我们想在 python 中执行此操作,但不使用 Graphframes(DF 的首选)。是否可以使用 Spark DF 来做到这一点?如果没有,那么我们将选择 Graphframe。

有 2 个 DF,即,employee_df 和 required_hierarchy_df

  1. employee_df 包含组织中所有员工的所有信息。
  2. required_hierarchy_df 包含我们需要查找其在组织中的最高层次结构的员工。

请引用下面的例子:

required_hierarchy_df:

employee_id | designation | supervisor_id | supervisor_designation
10 | Developer | 05 | Techincal Lead

员工_df:

employee_id  | designation | supervisor_id  | supervisor_designation
10 | Developer | 05 | Techincal Lead
05 | Technical Lead | 04 | Manager
04 | Director | 03 | Sr. Director
03 | Sr. Director| 02 | Chairman
02 | Chairman | 01 | CEO
01 | CEO | null | null

预期输出:

员工的报告级别:

报告级别_df:

employee_id | level_1_id | level_2_id | level_3_id | level_4_id | level_5_id
10 | 05 | 04 | 03 | 02 | 01

组织中的顶级层次结构信息:

top_level_df:

employee_id | designation | top_level_id | top_level_designation
10 | Developer | 01 | CEO

最佳答案

考虑不使用 Spark,因为它只有 200 万行。使用字典/图形/树状数据结构使这变得非常简单。我建议不要使用 Spark DataFrames 来执行此操作。

使用 Spark DataFrames,您可以通过递归联接来解决此问题,创建数据帧 report_level_df。这不是一个好的和/或有效的解决方案

代码

我们对员工与主管的关系感兴趣。

edges = employee_df.select('employee_id', 'supervisor_id')

可以说,向上迈出一步,需要一次连接

level_0 = edges \
.withColumnRenamed('employee_id', 'level_0') \
.withColumnRenamed('supervisor_id', 'level_1')

level_1 = edges \
.withColumnRenamed('employee_id', 'level_1') \
.withColumnRenamed('supervisor_id', 'level_2')

# Join, sort columns and show
level_0 \
.join(level_1, on='level_1') \
.select('level_0', 'level_1', 'level_2') \
.show()

我们想要递归地沿着链向上遍历它们。

total = edges \
.withColumnRenamed('employee_id', 'level_0') \
.withColumnRenamed('supervisor_id', 'level_1')

levels = 10

for i in range(1, levels):
level_i = edges \
.withColumnRenamed('employee_id', 'level_{}'.format(i)) \
.withColumnRenamed('supervisor_id', 'level_{}'.format(i+1))

total = total \
.join(level_i, on='level_{}'.format(i), how='left')

# Sort columns and show
total \
.select(['level_{}'.format(i) for i in range(levels)]) \
.show()

除了我们不想猜测关卡数量,因此我们每次都会检查是否已完成。这需要运行所有数据,因此速度很慢。

schema = 'employee_id int, supervisor_id int'
edges = spark.createDataFrame([[10, 5], [5, 4], [4, 3], [3, 2], [2, 1], [1, None]], schema=schema)

total = edges \
.withColumnRenamed('employee_id', 'level_0') \
.withColumnRenamed('supervisor_id', 'level_1')

i = 1

while True:
this_level = 'level_{}'.format(i)
next_level = 'level_{}'.format(i+1)
level_i = edges \
.withColumnRenamed('employee_id', this_level) \
.withColumnRenamed('supervisor_id', next_level)

total = total \
.join(level_i, on=this_level, how='left')

if total.where(f.col(next_level).isNotNull()).count() == 0:
break
else:
i += 1

# Sort columns and show
total \
.select(['level_{}'.format(i) for i in range(i+2)]) \
.show()

结果

+-------+-------+-------+-------+-------+-------+-------+
|level_5|level_4|level_3|level_2|level_1|level_0|level_6|
+-------+-------+-------+-------+-------+-------+-------+
| null| null| null| null| null| 1| null|
| null| null| null| null| 1| 2| null|
| null| null| null| 1| 2| 3| null|
| null| null| 1| 2| 3| 4| null|
| null| 1| 2| 3| 4| 5| null|
| 1| 2| 3| 4| 5| 10| null|
+-------+-------+-------+-------+-------+-------+-------+

关于python - 如何从 pyspark 中的另一列中查找一列的顶级层次结构?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57706410/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com