gpt4 book ai didi

python - 使用 Pyspark 从关系数据集构建层次结构

转载 作者:行者123 更新时间:2023-12-03 23:06:26 33 4
gpt4 key购买 nike

我是 Python 新手,并坚持从关系数据集构建层次结构。
如果有人对如何进行此操作有想法,那将是非常有帮助的。
我有一个包含数据的关系数据集

_currentnode,  childnode_  
root, child1
child1, leaf2
child1, child3
child1, leaf4
child3, leaf5
child3, leaf6
很快。我正在寻找一些 python 或 pyspark 代码
构建如下所示的层次结构数据框
_level1, level2,  level3,  level4_  
root, child1, leaf2, null
root, child1, child3, leaf5
root, child1, child3, leaf6
root, child1, leaf4, null
数据是字母数字,是一个巨大的数据集[~5000万条记录]。
此外,层次结构的根是已知的,可以在代码中进行硬连线。
因此,在上面的示例中,层次结构的根是“root”。

最佳答案

Pyspark 的最短路径
输入数据可以解释为具有 currentnode 之间连接的图形。和 childnode .那么问题是根节点和所有叶节点之间的最短路径是什么,称为single source shortest path .
Spark有Graphx处理图的并行计算。不幸的是,GraphX 没有提供 Python API(更多细节可以在 here 中找到)。支持 Python 的图形库是 GraphFrames . GraphFrames 使用了 GraphX 的一部分。
GraphX 和 GraphFrames 都为 sssp 提供了解决方案。不幸的是,这两种实现都只返回最短路径的长度,而不是路径本身( GraphXGraphFrames )。但是this answer为 GraphX 和 Scala 提供算法的实现,该算法也返回路径。所有三个解决方案都使用 Pregel .
将上述答案翻译为 GraphFrames/Python:
1. 数据准备
为所有节点提供唯一 ID 并更改列名称,使其适合所描述的名称 here

import pyspark.sql.functions as F

df = ...

vertices = df.select("currentnode").withColumnRenamed("currentnode", "node").union(df.select("childnode")).distinct().withColumn("id", F.monotonically_increasing_id()).cache()

edges = df.join(vertices, df.currentnode == vertices.node).drop(F.col("node")).withColumnRenamed("id", "src")\
.join(vertices, df.childnode== vertices.node).drop(F.col("node")).withColumnRenamed("id", "dst").cache()
Nodes                   Edges
+------+------------+ +-----------+---------+------------+------------+
| node| id| |currentnode|childnode| src| dst|
+------+------------+ +-----------+---------+------------+------------+
| leaf2| 17179869184| | child1| leaf4| 25769803776|249108103168|
|child1| 25769803776| | child1| child3| 25769803776| 68719476736|
|child3| 68719476736| | child1| leaf2| 25769803776| 17179869184|
| leaf6|103079215104| | child3| leaf6| 68719476736|103079215104|
| root|171798691840| | child3| leaf5| 68719476736|214748364800|
| leaf5|214748364800| | root| child1|171798691840| 25769803776|
| leaf4|249108103168| +-----------+---------+------------+------------+
+------+------------+
2. 创建 GraphFrame
from graphframes import GraphFrame
graph = GraphFrame(vertices, edges)
3. 创建将构成 Pregel 算法单个部分的 UDF
消息类型:
from pyspark.sql.types import *
vertColSchema = StructType()\
.add("dist", DoubleType())\
.add("node", StringType())\
.add("path", ArrayType(StringType(), True))
顶点程序:
def vertexProgram(vd, msg):
if msg == None or vd.__getitem__(0) < msg.__getitem__(0):
return (vd.__getitem__(0), vd.__getitem__(1), vd.__getitem__(2))
else:
return (msg.__getitem__(0), vd.__getitem__(1), msg.__getitem__(2))
vertexProgramUdf = F.udf(vertexProgram, vertColSchema)
传出的消息:
def sendMsgToDst(src, dst):
srcDist = src.__getitem__(0)
dstDist = dst.__getitem__(0)
if srcDist < (dstDist - 1):
return (srcDist + 1, src.__getitem__(1), src.__getitem__(2) + [dst.__getitem__(1)])
else:
return None
sendMsgToDstUdf = F.udf(sendMsgToDst, vertColSchema)
消息聚合:
def aggMsgs(agg):
shortest_dist = sorted(agg, key=lambda tup: tup[1])[0]
return (shortest_dist.__getitem__(0), shortest_dist.__getitem__(1), shortest_dist.__getitem__(2))
aggMsgsUdf = F.udf(aggMsgs, vertColSchema)
4. 组合零件
from graphframes.lib import Pregel
result = graph.pregel.withVertexColumn(colName = "vertCol", \
initialExpr = F.when(F.col("node")==(F.lit("root")), F.struct(F.lit(0.0), F.col("node"), F.array(F.col("node")))) \
.otherwise(F.struct(F.lit(float("inf")), F.col("node"), F.array(F.lit("")))).cast(vertColSchema), \
updateAfterAggMsgsExpr = vertexProgramUdf(F.col("vertCol"), Pregel.msg())) \
.sendMsgToDst(sendMsgToDstUdf(F.col("src.vertCol"), Pregel.dst("vertCol"))) \
.aggMsgs(aggMsgsUdf(F.collect_list(Pregel.msg()))) \
.setMaxIter(10) \
.setCheckpointInterval(2) \
.run()
result.select("vertCol.path").show(truncate=False)
评论:
  • maxIter应设置为至少与最长路径一样大的值。数值越大,结果不变,但计算时间变长。如果该值太小,则结果中将缺少较长的路径。当前版本的 GraphFrames (0.8.0) 不支持在不再发送新消息时停止循环。
  • checkpointInterval应设置为小于 maxIter 的值.实际值取决于数据和可用硬件。当出现 OutOfMemory 异常或 Spark session 挂起一段时间时,该值可能会减小。

  • 最终结果是带有内容的常规数据框
    +-----------------------------+
    |path |
    +-----------------------------+
    |[root, child1] |
    |[root, child1, leaf4] |
    |[root, child1, child3] |
    |[root] |
    |[root, child1, child3, leaf6]|
    |[root, child1, child3, leaf5]|
    |[root, child1, leaf2] |
    +-----------------------------+
    如有必要,可以在此处过滤掉非叶节点。

    关于python - 使用 Pyspark 从关系数据集构建层次结构,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62450917/

    33 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com