gpt4 book ai didi

sql - Spark SQL 超时

转载 作者:可可西里 更新时间:2023-11-01 16:13:03 30 4
gpt4 key购买 nike

我正在尝试在 Spark 独立集群上运行一个相对简单的 Spark SQL 命令

select a.name, b.name, s.score
from score s
inner join A a on a.id = s.a_id
inner join B b on b.id = s.b_id
where pmod(a.id, 3) != 3 and pmod(b.id, 3) != 0

表格大小如下

A: 25,000
B: 2,500,000
score: 25,000,000

因此,据此我希望得到 25,000,000 行的结果。我想用 Spark SQL 运行这个查询,然后处理每一行。这是相关的 Spark 代码

val sqlContext = new HiveContext(sc)
val sql = "<above SQL>"
sqlContext.sql(sql).first

这条命令在score表的大小为200,000时运行正常,但现在不运行。下面是相关日志

14/12/04 16:35:14 WARN LazyStruct: Extra bytes detected at the end of the row! Ignoring similar problems.
14/12/04 16:35:43 WARN LazyStruct: Extra bytes detected at the end of the row! Ignoring similar problems.
14/12/04 16:36:24 WARN LazyStruct: Extra bytes detected at the end of the row! Ignoring similar problems.
14/12/04 16:37:11 WARN LazyStruct: Extra bytes detected at the end of the row! Ignoring similar problems.
14/12/04 16:38:13 WARN LazyStruct: Extra bytes detected at the end of the row! Ignoring similar problems.
14/12/04 16:39:19 WARN LazyStruct: Extra bytes detected at the end of the row! Ignoring similar problems.
14/12/04 16:39:48 WARN LazyStruct: Extra bytes detected at the end of the row! Ignoring similar problems.
14/12/04 16:40:08 WARN MemoryStore: Not enough space to store block broadcast_12 in memory! Free memory is 1938057068 bytes.
14/12/04 16:40:08 WARN MemoryStore: Persisting block broadcast_12 to disk instead.
java.util.concurrent.TimeoutException: Futures timed out after [5 minutes]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.sql.execution.BroadcastHashJoin.execute(joins.scala:431)
at org.apache.spark.sql.execution.Project.execute(basicOperators.scala:42)
at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:111)
at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438)
at org.apache.spark.sql.SchemaRDD.take(SchemaRDD.scala:440)
at org.apache.spark.sql.SchemaRDD.take(SchemaRDD.scala:103)
at org.apache.spark.rdd.RDD.first(RDD.scala:1092)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:20)
at $iwC$$iwC$$iwC.<init>(<console>:25)
at $iwC$$iwC.<init>(<console>:27)
at $iwC.<init>(<console>:29)
at <init>(<console>:31)
at .<init>(<console>:35)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

我最初的想法是增加超时时间,但如果不重新编译源代码,这看起来是不可能的,如图 here .在父目录中,我也看到了一些不同的连接,但我不确定如何让 spark 来使用其他类型的连接。

我还试图通过将 spark.executor.memory 增加到 10g 来修复关于持久化到磁盘的第一个警告,但这并没有解决问题。

有谁知道我如何实际运行这个查询?

最佳答案

也许您遇到了广播连接超时的问题。出于某种原因,它是一个未记录的配置选项,名为 spark.sql.broadcastTimeout(默认为 300 秒)。

因此您可以尝试增加它(为我们工作),或者让 Spark 不进行广播连接(尽管建议将小表连接到大表,请参阅 https://docs.cloud.databricks.com/docs/latest/databricks_guide/06%20Spark%20SQL%20%26%20DataFrames/05%20BroadcastHashJoin%20-%20scala.html)。

关于sql - Spark SQL 超时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27306896/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com