gpt4 book ai didi

python - 我怎样才能拆除一个 SparkSession 并在一个应用程序中创建一个新的?

转载 作者:太空狗 更新时间:2023-10-29 18:03:35 28 4
gpt4 key购买 nike

我有一个pyspark程序,有多个独立的模块,每个模块都可以独立处理数据,以满足我的各种需求。但它们也可以链接在一起以在管道中处理数据。这些模块中的每一个都构建一个 SparkSession 并自行完美执行。

但是,当我尝试在同一个 python 进程中连续运行它们时,我遇到了问题。在管道中的第二个模块执行的那一刻,spark 提示我正在尝试使用的 SparkContext 已停止:

py4j.protocol.Py4JJavaError: An error occurred while calling o149.parquet.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.

这些模块中的每一个都在执行开始时构建一个 SparkSession,并在其进程结束时停止 sparkContext。我像这样构建和停止 session /上下文:

session = SparkSession.builder.appName("myApp").getOrCreate()
session.stop()

根据 official documentation , getOrCreate “获取一个现有的 SparkSession,或者,如果不存在,则根据此构建器中设置的选项创建一个新的。”但我不想要这种行为(这种行为是进程试图获取现有 session 的行为)。我找不到任何方法来禁用它,也不知道如何销毁 session ——我只知道如何停止其关联的 SparkContext。

如何在独立模块中构建新的 SparkSession,并在同一个 Python 进程中按顺序执行它们,而以前的 session 不会干扰新创建的 session ?

以下是项目结构的示例:

主.py

import collect
import process

if __name__ == '__main__':
data = collect.execute()
process.execute(data)

collect.py

import datagetter

def execute(data=None):
session = SparkSession.builder.appName("myApp").getOrCreate()

data = data if data else datagetter.get()
rdd = session.sparkContext.parallelize(data)
[... do some work here ...]
result = rdd.collect()
session.stop()
return result

进程.py

import datagetter

def execute(data=None):
session = SparkSession.builder.appName("myApp").getOrCreate()
data = data if data else datagetter.get()
rdd = session.sparkContext.parallelize(data)
[... do some work here ...]
result = rdd.collect()
session.stop()
return result

最佳答案

长话短说,Spark(包括 PySpark)并非设计用于在单个应用程序中处理多个上下文。如果您对 JVM 方面的故事感兴趣,我建议您阅读 SPARK-2243 (解决为不会修复)。

PySpark 中做出了许多设计决策,包括但不限于 a singleton Py4J gateway .有效you cannot have multiple SparkContexts in a single application . SparkSession 不仅绑定(bind)到 SparkContext,还会引入其自身的问题,例如处理本地(独立)Hive metastore(如果使用的话)。此外,还有内部使用 SparkSession.builder.getOrCreate 的函数,取决于您现在看到的行为。一个值得注意的例子是 UDF 注册。如果存在多个 SQL 上下文,其他函数可能会表现出意外行为(例如 RDD.toDF)。

多个上下文不仅不受支持,而且在我个人看来,还违反了单一职责原则。您的业​​务逻辑不应关注所有设置、清理和配置细节。

个人建议如下:

  • 如果应用程序由多个连贯的模块组成,这些模块可以组合在一起并受益于具有缓存和通用元存储的单个执行环境,则在应用程序入口点初始化所有必需的上下文,并在必要时将这些上下文传递给各个管道:

    • main.py:

      from pyspark.sql import SparkSession

      import collect
      import process

      if __name__ == "__main__":
      spark: SparkSession = ...

      # Pass data between modules
      collected = collect.execute(spark)
      processed = process.execute(spark, data=collected)
      ...
      spark.stop()
    • collect.py/process.py:

      from pyspark.sql import SparkSession

      def execute(spark: SparkSession, data=None):
      ...
  • 否则(根据您的描述,这里似乎是这种情况)我会设计入口点来执行单个管道并使用外部工作流管理器(如 Apache AirflowToil )来处理执行。

    它不仅更清洁,而且允许更灵活的故障恢复和调度。

    同样的事情当然可以用构建器来完成,但就像 smart person曾经说过:显式优于隐式。

    • main.py

      import argparse

      from pyspark.sql import SparkSession

      import collect
      import process

      pipelines = {"collect": collect, "process": process}

      if __name__ == "__main__":
      parser = argparse.ArgumentParser()
      parser.add_argument('--pipeline')
      args = parser.parse_args()

      spark: SparkSession = ...

      # Execute a single pipeline only for side effects
      pipelines[args.pipeline].execute(spark)
      spark.stop()
    • collect.py/process.py 与上一点相同。

无论哪种方式,我都会保留一个而且只有一个地方设置了上下文,并且只有一个地方被拆除。

关于python - 我怎样才能拆除一个 SparkSession 并在一个应用程序中创建一个新的?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41491972/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com