gpt4 book ai didi

apache-spark - 一个应用程序可以有多少个SparkSession?

转载 作者:行者123 更新时间:2023-12-04 03:58:56 25 4
gpt4 key购买 nike

我发现,随着Spark运行,并且表的大小增加(通过Joins),Spark执行程序最终将耗尽内存,整个系统崩溃。即使我尝试将临时结果写入Hive表(在HDFS上),系统仍然不会释放太多内存,并且大约130个连接后,整个系统崩溃。

但是,通过实验,我意识到,如果将问题分解为较小的部分,将临时结果写入配置单元表,然后停止/启动Spark session (以及Spark上下文),那么系统的资源将被释放。使用这种方法,我能够加入1000多个专栏。

但是我找不到任何文档来理解这是否是一种好的做法(我知道您不应该一次获得多个 session )。大多数系统在开始时获取 session ,然后在结束时关闭 session 。我还可以将应用程序分解为较小的应用程序,并使用Oozie之类的驱动程序在Yarn上调度这些较小的应用程序。但是这种方法将在每个阶段启动和停止JVM,这似乎有点繁重。

所以我的问题是:在单个spark应用程序运行期间连续启动/停止spark session 以释放系统资源是不好的做法吗?

但是,您能否详细说明单个JVM上的单个SparkContext的含义?我可以调用sparkSession.sparkContext().stop(),也可以stop调用SparkSession。然后,我创建了一个新的SparkSession并使用了一个新的sparkContext。没有引发任何错误。

我也可以在JavaSparkPi上使用它,而没有任何问题。

我已经在yarn-clientlocal spark安装中对此进行了测试。

停止spark上下文到底有什么用?为什么停止了Spark上下文后又不能创建它呢?

最佳答案

TL; DR 您可以根据需要具有许多SparkSession

在单个JVM上可以只有一个SparkContext,但是SparkSession的数量几乎是无限的。

But can you elaborate on what you mean by a single SparkContext on a single JVM?



这意味着在Spark应用程序的生命周期中的任何给定时间,驱动程序只能是一个,也只能是一个,这又意味着该JVM上只有一个且只有一个 SparkContext

Spark应用程序的驱动程序位于 SparkContext所在的位置(或者相反,而 SparkContext定义了驱动程序-区别非常模糊)。

您一次只能拥有一个 SparkContext。尽管您可以根据需要启动和停止它多次,但是我记得有一个问题,即您不应该关闭 SparkContext,除非您已经完成了Spark的操作(通常发生在Spark应用程序的最后)。

换句话说,在您的Spark应用程序的整个生命周期中只有一个 SparkContext

关于多个 SparkSession,存在一个类似的问题 What's the difference between SparkSession.sql vs Dataset.sqlContext.sql?,可以进一步说明为什么要进行两个或多个 session 。

I was able call sparkSession.sparkContext().stop(), and also stop the SparkSession.



所以?!这与我说的话有什么矛盾?您停止了JVM上唯一可用的 SparkContext。没有大碍。您可以,但这只是“在单个JVM上只能有一个且只有一个 SparkContext”的一部分,不是吗?
SparkSession只是 SparkContext的包装,可在Spark Core的RDD之上提供Spark SQL的结构化/SQL功能。

从Spark SQL开发人员的角度来看, SparkSession的目的是成为查询实体(例如表, View 或查询所使用的函数(如DataFrames,Datasets或SQL)和Spark属性(每个 SparkSession可能具有不同的值)的 namespace ) )。

如果您想为不同的数据集使用相同的(临时)表名,那么我将建议您创建两个 SparkSession

我刚刚研究了一个示例,以演示整个阶段的代码生成在Spark SQL中的工作方式,并创建了以下代码来简单地关闭该功能。
// both where and select operators support whole-stage codegen
// the plan tree (with the operators and expressions) meets the requirements
// That's why the plan has WholeStageCodegenExec inserted
// You can see stars (*) in the output of explain
val q = Seq((1,2,3)).toDF("id", "c0", "c1").where('id === 0).select('c0)
scala> q.explain
== Physical Plan ==
*Project [_2#89 AS c0#93]
+- *Filter (_1#88 = 0)
+- LocalTableScan [_1#88, _2#89, _3#90]

// Let's break the requirement of having up to spark.sql.codegen.maxFields
// I'm creating a brand new SparkSession with one property changed
val newSpark = spark.newSession()
import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_MAX_NUM_FIELDS
newSpark.sessionState.conf.setConf(WHOLESTAGE_MAX_NUM_FIELDS, 2)

scala> println(newSpark.sessionState.conf.wholeStageMaxNumFields)
2

// Let's see what's the initial value is
// Note that I use spark value (not newSpark)
scala> println(spark.sessionState.conf.wholeStageMaxNumFields)
100

import newSpark.implicits._
// the same query as above but created in SparkSession with WHOLESTAGE_MAX_NUM_FIELDS as 2
val q = Seq((1,2,3)).toDF("id", "c0", "c1").where('id === 0).select('c0)

// Note that there are no stars in the output of explain
// No WholeStageCodegenExec operator in the plan => whole-stage codegen disabled
scala> q.explain
== Physical Plan ==
Project [_2#122 AS c0#126]
+- Filter (_1#121 = 0)
+- LocalTableScan [_1#121, _2#122, _3#123]

I then created a new SparkSession and used a new SparkContext. No error was thrown.



再次,这与我所说的关于单个 SparkContext可用的说法有何矛盾?我很好奇。

What exactly does stopping the spark context do, and why can you not create a new one once you've stopped one?



您不再可以使用它来运行Spark作业(以处理大型和分布式数据集),这恰好是您首先使用Spark的原因,不是吗?

请尝试以下方法:
  • 停止SparkContext
  • 使用Spark Core的RDD或Spark SQL的数据集API执行任何处理

  • 有异常(exception)吗正确的!请记住,您关闭了Spark的“门”,所以您怎么会期望进入其中? :)

    关于apache-spark - 一个应用程序可以有多少个SparkSession?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47723761/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com