gpt4 book ai didi

python - 从发送到 spark-submit 的外部 __main__ 文件修改 SparkContext

转载 作者:可可西里 更新时间:2023-11-01 15:29:46 26 4
gpt4 key购买 nike

我正在尝试打包 python 依赖项,以便使用 spark-submit 发送到 hadoop 集群,我希望尽可能以 DRYest 方式执行此操作。

我希望我的 my_spark_app.py 看起来像这样:

from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName('MyApp').setMaster('yarn-client')
sc = SparkContext(conf=conf)

sc.addPyFile('/path/to/dependencies.py')
from dependencies import DependencyManager
dm = DependencyManager(sc)

dm.register_lib('dateutil')
import dateutil

# do stuff with dateutil

然后 dependencies.py 是这样的:

import zipfile, os

LIBPATH = '/path/to/my/python/env/lib/python2.7/site-packages/'

class DependencyManager(object):
"""
Collects dependencies to be zipped and sent to the spark context
"""
def __init__(self, spark_context):
self.sc = spark_context

def register_lib(self, p):
libpath = os.path.join(LIBPATH, p)
zippath = libpath + '.zip'
zf = zipfile.PyZipFile(zippath, mode='w')
try:
zf.debug = 3
zf.writepy(libpath)
self.sc.addPyFile(zippath)
finally:
zf.close()

这会产生这个(因为 zf.debug = 3):

Adding package in /path/to/env/lib/python2.7/site-packages/dateutil as dateutil
Adding dateutil/__init__.pyc
Adding dateutil/rrule.pyc
Adding dateutil/relativedelta.pyc
Adding package in /path/to/env/lib/python2.7/site-packages/dateutil/zoneinfo as dateutil/zoneinfo
Adding dateutil/zoneinfo/__init__.pyc
Adding dateutil/zoneinfo/rebuild.pyc
Adding dateutil/parser.pyc
Adding dateutil/tzwin.pyc
Adding dateutil/easter.pyc
Adding package in /path/to/env/lib/python2.7/site-packages/dateutil/tz as dateutil/tz
Adding dateutil/tz/__init__.pyc
Adding dateutil/tz/tz.pyc
Adding dateutil/tz/win.pyc
Adding dateutil/tz/_common.pyc
Traceback (most recent call last):
File "/path/to/my_spark_app.py", line 25
import dateutil
ImportError: No module named dateutil

不知何故,从 DependencyManager 类中调用 self.sc.addPyFile() 不会影响 SparkContext,即使它直接在 my_spark_app 中工作正常.py.

这是怎么回事?

最佳答案

问题很简单,和spark关系不大。在这里:

def register_lib(self, p):
libpath = os.path.join(LIBPATH, p)
zippath = libpath + '.zip'
zf = zipfile.PyZipFile(zippath, mode='w')
try:
zf.debug = 3
zf.writepy(libpath)
self.sc.addPyFile(zippath)
finally:
zf.close()

当调用 self.sc.addPyFile(zippath) 时,zf io 仍然打开。我们只需要在调用之前关闭它:

 def register_lib(self, p):
libpath = os.path.join(LIBPATH, p)
zippath = libpath + '.zip'
zf = zipfile.PyZipFile(zippath, mode='w')
try:
zf.debug = 3
zf.writepy(libpath)
zf.close() # file is now ready to add to the spark context
self.sc.addPyFile(zippath)
finally:
zf.close()

关于python - 从发送到 spark-submit 的外部 __main__ 文件修改 SparkContext,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36212385/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com