gpt4 book ai didi

python - 如何使用 Apache-Spark 在 AWS 集群上运行代码?

转载 作者:行者123 更新时间:2023-12-01 04:37:35 28 4
gpt4 key购买 nike

我编写了一个Python代码来总结每个csv文件第一列中的所有数字,如下所示:

import os, sys, inspect, csv

### Current directory path.
curr_dir = os.path.split(inspect.getfile(inspect.currentframe()))[0]

### Setup the environment variables
spark_home_dir = os.path.realpath(os.path.abspath(os.path.join(curr_dir, "../spark")))
python_dir = os.path.realpath(os.path.abspath(os.path.join(spark_home_dir, "./python")))
os.environ["SPARK_HOME"] = spark_home_dir
os.environ["PYTHONPATH"] = python_dir

### Setup pyspark directory path
pyspark_dir = python_dir
sys.path.append(pyspark_dir)

### Import the pyspark
from pyspark import SparkConf, SparkContext

### Specify the data file directory, and load the data files
data_path = os.path.realpath(os.path.abspath(os.path.join(curr_dir, "./test_dir")))

### myfunc is to add all numbers in the first column.
def myfunc(s):
total = 0
if s.endswith(".csv"):
cr = csv.reader(open(s,"rb"))
for row in cr:
total += int(row[0])
return total

def main():
### Initialize the SparkConf and SparkContext
conf = SparkConf().setAppName("ruofan").setMaster("spark://ec2-52-26-177-197.us-west-2.compute.amazonaws.com:7077")
sc = SparkContext(conf = conf)
datafile = sc.wholeTextFiles(data_path)

### Sent the application in each of the slave node
temp = datafile.map(lambda (path, content): myfunc(str(path).strip('file:')))

### Collect the result and print it out.
for x in temp.collect():
print x

if __name__ == "__main__":
main()

我想使用 Apache-Spark 使用相同的 python 代码并行化多个 csv 文件的求和过程。我已经完成了以下步骤:

  1. 我在 AWS 上创建了一个主节点和两个从节点。
  2. 我使用了 bash 命令 $ scp -r -i my-key-pair.pem my_dir root@ec2-52-27-82-124.us-west-2.compute.amazonaws.com上传目录my_dir将我的 python 代码和 csv 文件包含到集群主节点上。
  3. 我已经登录了我的主节点,并从那里使用了 bash 命令 $ ./spark/copy-dir my_dir将我的 python 代码以及 csv 文件发送到所有从属节点。
  4. 我已经在主节点上设置了环境变量:

    $ export SPARK_HOME=~/spark

    $ export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH

但是,当我在主节点上运行python代码时:$ python sum.py ,它显示以下错误:

Traceback (most recent call last):
File "sum.py", line 18, in <module>
from pyspark import SparkConf, SparkContext
File "/root/spark/python/pyspark/__init__.py", line 41, in <module>
from pyspark.context import SparkContext
File "/root/spark/python/pyspark/context.py", line 31, in <module>
from pyspark.java_gateway import launch_gateway
File "/root/spark/python/pyspark/java_gateway.py", line 31, in <module>
from py4j.java_gateway import java_import, JavaGateway, GatewayClient
ImportError: No module named py4j.java_gateway

我对这个错误没有任何想法。另外,我想知道主节点是否会自动调用所有从节点并行运行。如果有人能帮助我,我真的很感激。

最佳答案

以下是我调试此特定导入错误的方法。

  1. 通过 ssh 连接到您的主节点
  2. 使用 $ python 运行 python REPL
  3. 尝试失败的导入行 >> from py4j.java_gateway import java_import, JavaGateway, GatewayClient
  4. 如果失败,请尝试简单地运行 >> import py4j
  5. 如果失败,则意味着您的系统没有安装 py4j 或找不到它。
  6. 退出 REPL >> exit()
  7. 尝试安装 py4j $ pip install py4j (您需要安装 pip)
  8. 打开 REPL $ python
  9. 尝试再次导入 >> from py4j.java_gateway import java_import, JavaGateway, GatewayClient
  10. 如果有效,则 >> exit() 并尝试再次运行 $ python sum.py

关于python - 如何使用 Apache-Spark 在 AWS 集群上运行代码?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31464881/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com