gpt4 book ai didi

apache-spark - 一个多小时执行pyspark.sql.DataFrame.take(4)

转载 作者:行者123 更新时间:2023-12-04 04:00:57 24 4
gpt4 key购买 nike

我在3个具有4个内核和16GB RAM的虚拟机(即1个主服务器; 2个从属服务器)上运行spark 1.6。

我可以看到在spark-master webUI上注册的工作人员。

我想从Vertica数据库中检索数据以对其进行处理。由于我无法运行复杂的查询,因此尝试使用虚拟查询来理解。我们认为这是一项简单的任务。

我的代码是:

df = sqlContext.read.format('jdbc').options(url='xxxx', dbtable='xxx', user='xxxx', password='xxxx').load()
four = df.take(4)

输出为(请注意:我将slavt VM IP:Port替换为 @IPSLAVE):
16/03/08 13:50:41 INFO SparkContext: Starting job: take at <stdin>:1
16/03/08 13:50:41 INFO DAGScheduler: Got job 0 (take at <stdin>:1) with 1 output partitions
16/03/08 13:50:41 INFO DAGScheduler: Final stage: ResultStage 0 (take at <stdin>:1)
16/03/08 13:50:41 INFO DAGScheduler: Parents of final stage: List()
16/03/08 13:50:41 INFO DAGScheduler: Missing parents: List()
16/03/08 13:50:41 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at take at <stdin>:1), which has no missing parents
16/03/08 13:50:41 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 5.4 KB, free 5.4 KB)
16/03/08 13:50:41 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 2.6 KB, free 7.9 KB)
16/03/08 13:50:41 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on @IPSLAVE (size: 2.6 KB, free: 511.5 MB)
16/03/08 13:50:41 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
16/03/08 13:50:41 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at take at <stdin>:1)
16/03/08 13:50:41 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
16/03/08 13:50:41 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, @IPSLAVE, partition 0,PROCESS_LOCAL, 1922 bytes)
16/03/08 13:50:41 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on @IPSLAVE (size: 2.6 KB, free: 511.5 MB)
16/03/08 15:02:20 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 4299240 ms on @IPSLAVE (1/1)
16/03/08 15:02:20 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
16/03/08 15:02:20 INFO DAGScheduler: ResultStage 0 (take at <stdin>:1) finished in 4299.248 s
16/03/08 15:02:20 INFO DAGScheduler: Job 0 finished: take at <stdin>:1, took 4299.460581 s

如您所见,这花费了很长时间。
我的表实际上很大(存储约2.2亿行,每行11个字段),但是使用“普通” sql(例如pyodbc)可以立即执行这样的查询。

我想我误会了/错过了Spark,您是否有这样的想法或建议来使其更有效?

最佳答案

尽管Spark在JDBC上支持有限的谓词下推,所有其他操作(例如limit,group,aggregation)都在内部执行。不幸的是,这意味着take(4)将首先获取数据,然后再应用limit。换句话说,您的数据库将执行以下操作(假设没有投影过滤器):

SELECT * FROM table 

其余的将由Spark处理。涉及一些优化(特别是Spark evaluates partitions iteratively以获取 LIMIT请求的记录数),但是与数据库端优化相比,它的效率仍然很低。

如果要将 limit推送到数据库,则必须使用子查询作为 dbtable参数静态地完成此操作:

(sqlContext.read.format('jdbc')
.options(url='xxxx', dbtable='(SELECT * FROM xxx LIMIT 4) tmp', ....))

sqlContext.read.format("jdbc").options(Map(
"url" -> "xxxx",
"dbtable" -> "(SELECT * FROM xxx LIMIT 4) tmp",
))

请注意,子查询中的别名是必需的。

注意:

一旦Data Source API v2准备就绪,将来可以改善此行为:
  • SPARK-15689
  • SPIP: Data Source API V2
  • 关于apache-spark - 一个多小时执行pyspark.sql.DataFrame.take(4),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35869884/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com