gpt4 book ai didi

performance - Spark "first"窗口函数花费的时间比 "last"长得多

转载 作者:行者123 更新时间:2023-12-04 12:53:34 27 4
gpt4 key购买 nike

我正在使用 pyspark 例程来插入配置表中的缺失值。
想象一个从 0 到 50,000 的配置值表。用户指定介于两者之间的几个数据点(例如 0、50、100、500、2000、500000),然后我们对余数进行插值。我的解决方案主要遵循 this blog post非常接近,除了我没有使用任何 UDF。
在对它的性能进行故障排除时(大约需要 3 分钟),我发现一个特定的窗口函数正在占用所有时间,而我所做的其他一切只需要几秒钟。
这是主要感兴趣的领域 - 我使用窗口函数来填充上一个和下一个用户提供的配置值:

from pyspark.sql import Window, functions as F

# Create partition windows that are required to generate new rows from the ones provided
win_last = Window.partitionBy('PORT_TYPE', 'loss_process').orderBy('rank').rowsBetween(Window.unboundedPreceding, 0)
win_next = Window.partitionBy('PORT_TYPE', 'loss_process').orderBy('rank').rowsBetween(0, Window.unboundedFollowing)

# Join back in the provided config table to populate the "known" scale factors
df_part1 = (df_scale_factors_template
.join(df_users_config, ['PORT_TYPE', 'loss_process', 'rank'], 'leftouter')
# Add computed columns that can lookup the prior config and next config for each missing value
.withColumn('last_rank', F.last( F.col('rank'), ignorenulls=True).over(win_last))
.withColumn('last_sf', F.last( F.col('scale_factor'), ignorenulls=True).over(win_last))
).cache()
debug_log_dataframe(df_part1 , 'df_part1') # Force a .count() and time Part1

df_part2 = (df_part1
.withColumn('next_rank', F.first(F.col('rank'), ignorenulls=True).over(win_next))
.withColumn('next_sf', F.first(F.col('scale_factor'), ignorenulls=True).over(win_next))
).cache()
debug_log_dataframe(df_part2 , 'df_part2') # Force a .count() and time Part2

df_part3 = (df_part2
# Implements standard linear interpolation: y = y1 + ((y2-y1)/(x2-x1)) * (x-x1)
.withColumn('scale_factor',
F.when(F.col('last_rank')==F.col('next_rank'), F.col('last_sf')) # Handle div/0 case
.otherwise(F.col('last_sf') + ((F.col('next_sf')-F.col('last_sf'))/(F.col('next_rank')-F.col('last_rank'))) * (F.col('rank')-F.col('last_rank'))))
.select('PORT_TYPE', 'loss_process', 'rank', 'scale_factor')
).cache()
debug_log_dataframe(df_part3, 'df_part3', explain: True) # Force a .count() and time Part3
上面曾经是一个单一的链式数据帧语句,但我已经将它分成了 3 部分,以便我可以隔离需要很长时间的部分。结果是:
  • Part 1: Generated 8 columns and 300006 rows in 0.65 seconds
  • Part 2: Generated 10 columns and 300006 rows in 189.55 seconds
  • Part 3: Generated 4 columns and 300006 rows in 0.24 seconds

  • 为什么我的电话是 first()Window.unboundedFollowing花费比 last() 更长的时间在 Window.unboundedPreceding ?

    避免问题/疑虑的一些注意事项:
  • debug_log_dataframe只是一个辅助函数,用于使用 .Count() 强制执行数据帧/缓存并计时以产生上述日志。
  • 我们实际上一次操作 6 个 50001 行的配置表(因此是分区和行数)
  • 作为健全性检查,我排除了 cache() 的影响。明确重用 unpersist()在为后续运行计时之前 - 我对上述测量非常有信心。

  • 实物图 :
    为了帮助回答这个问题,我调用 explain()根据第 3 部分的结果,除其他外,确认缓存具有预期效果。这里有注释以突出问题区域:
    explain
    我能看到的唯一区别是:
  • 前两次调用(到 last )显示 RunningWindowFunction ,而对 next 的调用刚读 Window
  • 第 1 部分旁边有一个 *(3),但第 2 部分没有。

  • 我尝试过的一些事情 :
  • 我尝试进一步将第 2 部分拆分为单独的数据帧 - 结果是每个 first语句占用总时间的一半(~98 秒)
  • 我尝试颠倒生成这些列的顺序(例如,在调用“first”之后调用“last”),但没有区别。无论哪个数据帧最终包含对 first 的调用是慢的。

  • 我觉得我已经做了尽可能多的挖掘工作,并且有点希望 Spark 专家能够看看这个时间是从哪里来的。

    最佳答案

    不回答问题的解决方案
    在尝试各种方法来加速我的日常工作时,我想到尝试重写我对 first() 的用法。只是 last() 的用法以相反的排序顺序。
    所以重写这个:

    win_next = (Window.partitionBy('PORT_TYPE', 'loss_process')
    .orderBy('rank').rowsBetween(0, Window.unboundedFollowing))

    df_part2 = (df_part1
    .withColumn('next_rank', F.first(F.col('rank'), ignorenulls=True).over(win_next))
    .withColumn('next_sf', F.first(F.col('scale_factor'), ignorenulls=True).over(win_next))
    )
    像这样:
    win_next = (Window.partitionBy('PORT_TYPE', 'loss_process')
    .orderBy(F.desc('rank')).rowsBetween(Window.unboundedPreceding, 0))

    df_part2 = (df_part1
    .withColumn('next_rank', F.last(F.col('rank'), ignorenulls=True).over(win_next))
    .withColumn('next_sf', F.last(F.col('scale_factor'), ignorenulls=True).over(win_next))
    )
    令我惊讶的是,这实际上解决了性能问题,现在整个数据帧在短短 3 秒内生成。我很高兴,但仍然很烦恼。
    正如我预测的那样,查询计划现在在创建接下来的两列之前包括一个新的 SORT 步骤,并且它们已经从 Window 改变了。至 RunningWindowFunction作为前两个。这是新计划(不再将代码分解为 3 个单独的缓存部分,因为那只是为了解决性能问题):
    enter image description here
    至于问题:

    Why do my calls to first() over Window.unboundedFollowing take so much longer than last() over Window.unboundedPreceding?


    出于学术原因,我希望有人仍然可以回答这个问题

    关于performance - Spark "first"窗口函数花费的时间比 "last"长得多,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69308560/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com