gpt4 book ai didi

hadoop - 如何将 PySpark worker 中的 numpy 数组保存到 HDFS 或共享文件系统?

转载 作者:可可西里 更新时间:2023-11-01 14:37:18 41 4
gpt4 key购买 nike

我想在 PySpark 中高效地将 numpy 数组从工作机器(函数)保存到 HDFS 或从工作机器(函数)读取 numpy 数组。我有两台机器 A 和 B。A 有 master 和 worker。 B 有一名 worker 。例如我想实现如下目标:

if __name__ == "__main__":
conf = SparkConf().setMaster("local").setAppName("Test")
sc = SparkContext(conf = conf)
sc.parallelize([0,1,2,3], 2).foreachPartition(func)

def func(iterator):
P = << LOAD from HDFS or Shared Memory as numpy array>>
for x in iterator:
P = P + x

<< SAVE P (numpy array) to HDFS/ shared file system >>

什么是快速有效的方法?

最佳答案

我遇到了同样的问题。并最终使用了使用 HdfsCli module 的解决方法和 Python3.4 的临时文件。

  1. 进口:
from hdfs import InsecureClient
from tempfile import TemporaryFile
  1. 创建一个 hdfs 客户端。在大多数情况下,最好在脚本中的某处使用实用函数,例如:
def get_hdfs_client():
return InsecureClient("<your webhdfs uri>", user="<hdfs user>",
root="<hdfs base path>")
  1. 在工作函数中加载并保存您的 numpy:
hdfs_client = get_hdfs_client()

# load from file.npy
path = "/whatever/hdfs/file.npy"
tf = TemporaryFile()

with hdfs_client.read(path) as reader:
tf.write(reader.read())
tf.seek(0) # important, set cursor to beginning of file

np_array = numpy.load(tf)

...

# save to file.npy
tf = TemporaryFile()
numpy.save(tf, np_array)
tf.seek(0) # important ! set the cursor to the beginning of the file
# with overwrite=False, an exception is thrown if the file already exists
hdfs_client.write("/whatever/output/file.npy", tf.read(), overwrite=True)

注意事项:

  • 用于创建hdfs客户端的uri以http://开头,因为它使用了hdfs文件系统的web界面;
  • 确保你传给hdfs客户端的用户有读写权限
  • 根据我的经验,开销并不大(至少在执行时间方面)
  • 使用临时文件(相对于 /tmp 中的常规文件)的优点是您可以确保在脚本结束后集群机器中没有垃圾文件,无论是否正常

关于hadoop - 如何将 PySpark worker 中的 numpy 数组保存到 HDFS 或共享文件系统?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33791535/

41 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com