gpt4 book ai didi

python - 在 PySpark 中应用自定义函数时使用外部模块

转载 作者:太空宇宙 更新时间:2023-11-03 16:48:21 31 4
gpt4 key购买 nike

以下代码片段尝试将一个简单的函数应用于 PySpark RDD 对象:

import pyspark
conf = pyspark.SparkConf()
conf.set('spark.dynamicAllocation.minExecutors', 5)
sc = SparkContext(appName="tmp", conf=conf)
sc.setLogLevel('WARN')

fn = 'my_csv_file'
rdd = sc.textFile(fn)
rdd = rdd.map(lambda line: line.split(","))
header = rdd.first()
rdd = rdd.filter(lambda line:line != header)
def parse_line(line):
ret = pyspark.Row(**{h:line[i] for (i, h) in enumerate(header)})
return ret
rows = rdd.map(lambda line: parse_line(line))
sdf = rows.toDF()

如果我使用 python my_snippet.py 启动程序,它会失败并提示:

File "<ipython-input-27-8e46d56b2984>", line 6, in <lambda>
File "<ipython-input-27-8e46d56b2984>", line 3, in parse_line
NameError: global name 'pyspark' is not defined

我将 parse_line 函数替换为以下内容:

def parse_line(line):
ret = h:line[i] for (i, h) in enumerate(header)
ret['dir'] = dir()
return ret

现在,数据框已创建,dir 列显示内部的命名空间该函数仅包含两个对象:lineret。如何将其他模块和对象作为函数的一部分?不仅是 pyspark,还有其他的。

编辑请注意,pyspark 在程序中可用。仅当该函数由 map(并且我假设 filterreduce 等)调用时,它才不会看到任何导入的模块。

最佳答案

1)对原问题的回答:问题的根源似乎是运行 python my_snippet.py您应该使用 spark-submit my_snippet.py

执行代码

2) ipython笔记本问题的答案:在我的 ipython 笔记本个人配置中,以下行不存在:

import pyspark
conf = pyspark.SparkConf()
conf.set('spark.dynamicAllocation.minExecutors', 5)
sc = SparkContext(appName="tmp", conf=conf)

“sc”在我的程序范围之外定义

3)回答有关numpy(或其他需要安装的模块)的问题为了使用 numpy,您需要在集群中的每个节点上安装 numpy(使用 apt-get 或 pip 或从源安装)。

关于python - 在 PySpark 中应用自定义函数时使用外部模块,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36111685/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com