gpt4 book ai didi

apache-spark - Pyspark 简单的重新分区和 toPandas() 未能在 600,000+ 行上完成

转载 作者:IT王子 更新时间:2023-10-28 23:36:03 26 4
gpt4 key购买 nike

我有 JSON 数据,我正在将这些数据读入一个包含多个字段的数据框中,根据两列对其进行重新分区,然后转换为 Pandas。

这项作业在仅 600,000 行数据上的 EMR 上不断失败,并带有一些模糊的错误。我还增加了 Spark 驱动程序的内存设置,但仍然看不到任何分辨率。

这是我的 pyspark 代码:

enhDataDf = (
sqlContext
.read.json(sys.argv[1])
)

enhDataDf = (
enhDataDf
.repartition('column1', 'column2')
.toPandas()
)
enhDataDf = sqlContext.createDataFrame(enhDataDf)
enhDataDf = (
enhDataDf
.toJSON()
.saveAsTextFile(sys.argv[2])
)

我的spark设置如下:

conf = SparkConf().setAppName('myapp1')
conf.set('spark.yarn.executor.memoryOverhead', 8192)
conf.set('spark.executor.memory', 8192)
conf.set('spark.driver.memory', 8192)
sc = SparkContext(conf=conf)

我得到的错误是:

16/10/01 19:57:56 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:37973 disassociated! Shutting down.
16/10/01 19:57:11 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:42167 disassociated! Shutting down.
16/10/01 19:57:56 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:37973 disassociated! Shutting down.
log4j:ERROR Could not read configuration file from URL [file:/etc/spark/conf/log4j.properties].
log4j:ERROR Ignoring configuration file [file:/etc/spark/conf/log4j.properties].
16/10/01 19:57:11 ERROR ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
16/10/01 19:57:11 ERROR ApplicationMaster: User application exited with status 143
log4j:ERROR Could not read configuration file from URL [file:/etc/spark/conf/log4j.properties].
log4j:ERROR Ignoring configuration file [file:/etc/spark/conf/log4j.properties].
16/10/01 19:57:56 ERROR ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
16/10/01 19:57:56 ERROR ApplicationMaster: User application exited with status 143
16/10/01 19:57:11 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:42167 disassociated! Shutting down.
16/10/01 19:57:56 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:37973 disassociated! Shutting down.

该代码在最多约 600,000 行 JSON 行上运行良好 - 即使有大量可用内存。然后,它一直失败。

对正在发生的事情以及如何调试/修复此问题有任何想法吗?

最佳答案

我认为问题出在您代码的以下部分:

enhDataDf = (
enhDataDf
.repartition('column1', 'column2')
.toPandas()
)

.toPandas() 收集数据,所以当记录数增加时,会导致驱动失败。

根据您的评论,这是您使用的确切管道。这意味着整个阶段不仅过时而且不正确。当数据被收集并进一步并行化时,可以保证由

创建的分区
.repartition('column1', 'column2')

将在您重新创建 Spark DataFrame 时保留:

sqlContext.createDataFrame(enhDataDf)

如果要按列写入数据,可以直接进行:

(sqlContext
.read.json(sys.argv[1])
.repartition('column1', 'column2')
.write
.json(sys.argv[2]))

跳过中间 toPandas 并转换为 RDD。

关注您的评论:

如果 toPandas 服务于某个目的,那么它将始终是管道中的一个限制因素,唯一直接的解决方案是扩大驱动程序节点。根据您对收集的数据应用的确切算法,您可以考虑其他选项:

  • 您在 Spark 上使用的重新实现算法尚不可用。
  • 考虑具有更好的 SciPy 堆栈互操作性的替代框架(如 Dask)。

关于apache-spark - Pyspark 简单的重新分区和 toPandas() 未能在 600,000+ 行上完成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39811054/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com