作者热门文章
- r - 以节省内存的方式增长 data.frame
- ruby-on-rails - ruby/ruby on rails 内存泄漏检测
- android - 无法解析导入android.support.v7.app
- UNIX 域套接字与共享内存(映射文件)
我有 JSON 数据,我正在将这些数据读入一个包含多个字段的数据框中,根据两列对其进行重新分区,然后转换为 Pandas。
这项作业在仅 600,000 行数据上的 EMR 上不断失败,并带有一些模糊的错误。我还增加了 Spark 驱动程序的内存设置,但仍然看不到任何分辨率。
这是我的 pyspark 代码:
enhDataDf = (
sqlContext
.read.json(sys.argv[1])
)
enhDataDf = (
enhDataDf
.repartition('column1', 'column2')
.toPandas()
)
enhDataDf = sqlContext.createDataFrame(enhDataDf)
enhDataDf = (
enhDataDf
.toJSON()
.saveAsTextFile(sys.argv[2])
)
我的spark设置如下:
conf = SparkConf().setAppName('myapp1')
conf.set('spark.yarn.executor.memoryOverhead', 8192)
conf.set('spark.executor.memory', 8192)
conf.set('spark.driver.memory', 8192)
sc = SparkContext(conf=conf)
我得到的错误是:
16/10/01 19:57:56 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:37973 disassociated! Shutting down.
16/10/01 19:57:11 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:42167 disassociated! Shutting down.
16/10/01 19:57:56 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:37973 disassociated! Shutting down.
log4j:ERROR Could not read configuration file from URL [file:/etc/spark/conf/log4j.properties].
log4j:ERROR Ignoring configuration file [file:/etc/spark/conf/log4j.properties].
16/10/01 19:57:11 ERROR ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
16/10/01 19:57:11 ERROR ApplicationMaster: User application exited with status 143
log4j:ERROR Could not read configuration file from URL [file:/etc/spark/conf/log4j.properties].
log4j:ERROR Ignoring configuration file [file:/etc/spark/conf/log4j.properties].
16/10/01 19:57:56 ERROR ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
16/10/01 19:57:56 ERROR ApplicationMaster: User application exited with status 143
16/10/01 19:57:11 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:42167 disassociated! Shutting down.
16/10/01 19:57:56 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:37973 disassociated! Shutting down.
该代码在最多约 600,000 行 JSON 行上运行良好 - 即使有大量可用内存。然后,它一直失败。
对正在发生的事情以及如何调试/修复此问题有任何想法吗?
最佳答案
我认为问题出在您代码的以下部分:
enhDataDf = (
enhDataDf
.repartition('column1', 'column2')
.toPandas()
)
.toPandas()
收集数据,所以当记录数增加时,会导致驱动失败。
根据您的评论,这是您使用的确切管道。这意味着整个阶段不仅过时而且不正确。当数据被收集并进一步并行化时,可以保证由
创建的分区.repartition('column1', 'column2')
将在您重新创建 Spark DataFrame
时保留:
sqlContext.createDataFrame(enhDataDf)
如果要按列写入数据,可以直接进行:
(sqlContext
.read.json(sys.argv[1])
.repartition('column1', 'column2')
.write
.json(sys.argv[2]))
跳过中间 toPandas
并转换为 RDD。
关注您的评论:
如果 toPandas
服务于某个目的,那么它将始终是管道中的一个限制因素,唯一直接的解决方案是扩大驱动程序节点。根据您对收集的数据应用的确切算法,您可以考虑其他选项:
关于apache-spark - Pyspark 简单的重新分区和 toPandas() 未能在 600,000+ 行上完成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39811054/
我是一名优秀的程序员,十分优秀!