gpt4 book ai didi

apache-spark-sql - 过滤时Spark sql "Futures timed out after 300 seconds"

转载 作者:行者123 更新时间:2023-12-04 17:52:14 24 4
gpt4 key购买 nike

在执行看似简单的 spark sql 过滤作业时出现异常:

    someOtherDF
.filter(/*somecondition*/)
.select($"eventId")
.createOrReplaceTempView("myTempTable")

records
.filter(s"eventId NOT IN (SELECT eventId FROM myTempTable)")

知道如何解决这个问题吗?

注意:

  • someOtherDF 在过滤后包含 ~1M 到 5M 行,eventId 是 guid。
  • 记录包含 4000 万到 5000 万行。

错误:

Stacktrace:

org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResultInForkJoinSafely(ThreadUtils.scala:215)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:124)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:124)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:123)
at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.doExecute(BroadcastNestedLoopJoinExec.scala:343)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
at ...
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at org.apache.spark.util.ThreadUtils$.awaitResultInForkJoinSafely(ThreadUtils.scala:212)
... 84 more

最佳答案

使用的片段来自:1)How to exclude rows that don't join with another table?2) Spark Duplicate columns in dataframe after join

我可以使用这样的左外连接来解决我的问题:

    val leftColKey = records("eventId")
val rightColKey = someOtherDF("eventId")
val toAppend: DataFrame = records
.join(someOtherDF, leftColKey === rightColKey, "left_outer")
.filter(rightColKey.isNull) // Keep rows without a match in 'someOtherDF'. See (1)
.drop(rightColKey) // Needed to discard duplicate column. See (2)

性能非常好,不会出现“ future 超时”问题。

编辑

正如一位同事向我指出的,“leftanti”连接类型更有效。

关于apache-spark-sql - 过滤时Spark sql "Futures timed out after 300 seconds",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43665995/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com