gpt4 book ai didi

python - 在 pyspark 中按层次结构顺序检索行时出现数据帧性能问题

转载 作者:行者123 更新时间:2023-12-01 08:32:24 26 4
gpt4 key购买 nike

在 pyspark 中按层次结构顺序检索行时出现数据帧性能问题。

在 pyspark 中按层次结构顺序检索行时出现数据帧性能问题

我尝试使用 pyspark 数据帧从 csv 文件中按层次结构顺序检索数据,但按层次结构顺序检索 30k 条记录需要 3 个多小时。

是否有其他方法可以解决 pyspark dataframe 中的此问题?

有人可以帮我解决这个问题吗?

from datetime import datetime
from pyspark.sql.functions import lit
df = sc.read.csv(path/of/csv/file, **kargs)
df.cache()
df.show()

def get_child(pid, df, col_name):
df_child_s = df.selectExpr(col_name).where(col("pid") == pid)
return df_child_s


def all_data(pid, df, col_name):
df_child_exist = True
cnt = 0
df_o = get_child_str(pid, df, col_name)

df_o = df_o.withColumn("order_id", lit(cnt))

df_child_exist = len(df_o.take(1)) >= 1
if df_child_exist :
dst = df_o.selectExpr("child_id").first()[0]

while df_child_exist:
cnt += 1


df_o2 = get_child_str(dst, df, "*")
df_o2 = df_o2.withColumn("order_id", lit(cnt))

df_child_exist = len(df_o2.take(1)) >= 1
if df_child_exist :

dst = df_o2.selectExpr("childid_id").first()[0]
df_o = df_o.union(df_o2)

return df_o



pid = 0
start = datetime.now()
df_f_1 = all_data(pid, df, "*")
df_f_1.show()
end = datetime.now()
totalTime = end - start
print(f"total execution time :{totalTime}")
**csv 文件数据**子ID 父ID248278 264543251713 252689252689 248278258977 251713264543 0**预期输出结果:**子 ID 父 ID264543 0248278 264543252689 248278251713 252689 或者+------+------+-----+|目的地|源|级别|+------+------+-----+|264543| 0| 0||248278|264543| 1||252689|248278| 2||251713|252689| 3||258977|251713| 4||+------+------+-----+

最佳答案

Raj,这是我按要求提供的 graphFrame 答案。

我认为有一种更简单的方法可以使用 GraphFrames 来做到这一点。我没有找到一种方法以简单的方式找到所有死者。我提供两种解决方案。

from graphframes import GraphFrame
from pyspark.sql.functions import col

# initial dataframe
edgesDf = spark.createDataFrame([
(248278, 264543),
(251713, 252689),
(252689, 248278),
(258977, 251713),
(264543, 0)
],
["dst", "src"]
)

# get all ids as vertices
verticesDf = edgesDf.select(col("dst").alias("id")).union(edgesDf.select("src")).distinct()

# create graphFrame
graphGf = GraphFrame(verticesDf, edgesDf)

# for performance
sc.setCheckpointDir("/tmp/checkpoints")
graphGf.cache()

#### Motif approach
# note that this requires knowing the depth of the tree
fullPathDf = graphGf.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d); (d)-[de]->(e); (e)-[ef]->(f)")

# pivot
edgeDf = fullPathDf.select(col("ab").alias("edge")).union(fullPathDf.select("bc")).union(fullPathDf.select("cd")).union(fullPathDf.select("de")).union(fullPathDf.select("ef"))

# Result
edgeDf.select("edge.dst", "edge.src").show()

### Breadth First Search approach
#
# Does not require knowing the depth, but does require knowing the id of the leaf node
pathDf = graphGf.bfs("id = 0", "id = 258977", maxPathLength = 5)

# pivot
edgeDf = pathDf.select(col("e0").alias("edge")).union(pathDf.select("e1")).union(pathDf.select("e2")).union(pathDf.select("e3")).union(pathDf.select("e4")

#
edgeDf.select("edge.dst", "edge.src").show()

关于python - 在 pyspark 中按层次结构顺序检索行时出现数据帧性能问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53857048/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com