作者热门文章
- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
我想在 PySpark 中高效地将 numpy 数组从工作机器(函数)保存到 HDFS 或从工作机器(函数)读取 numpy 数组。我有两台机器 A 和 B。A 有 master 和 worker。 B 有一名 worker 。例如我想实现如下目标:
if __name__ == "__main__":
conf = SparkConf().setMaster("local").setAppName("Test")
sc = SparkContext(conf = conf)
sc.parallelize([0,1,2,3], 2).foreachPartition(func)
def func(iterator):
P = << LOAD from HDFS or Shared Memory as numpy array>>
for x in iterator:
P = P + x
<< SAVE P (numpy array) to HDFS/ shared file system >>
什么是快速有效的方法?
最佳答案
我遇到了同样的问题。并最终使用了使用 HdfsCli module 的解决方法和 Python3.4 的临时文件。
from hdfs import InsecureClient
from tempfile import TemporaryFile
def get_hdfs_client():
return InsecureClient("<your webhdfs uri>", user="<hdfs user>",
root="<hdfs base path>")
hdfs_client = get_hdfs_client()
# load from file.npy
path = "/whatever/hdfs/file.npy"
tf = TemporaryFile()
with hdfs_client.read(path) as reader:
tf.write(reader.read())
tf.seek(0) # important, set cursor to beginning of file
np_array = numpy.load(tf)
...
# save to file.npy
tf = TemporaryFile()
numpy.save(tf, np_array)
tf.seek(0) # important ! set the cursor to the beginning of the file
# with overwrite=False, an exception is thrown if the file already exists
hdfs_client.write("/whatever/output/file.npy", tf.read(), overwrite=True)
注意事项:
http://
开头,因为它使用了hdfs文件系统的web界面;/tmp
中的常规文件)的优点是您可以确保在脚本结束后集群机器中没有垃圾文件,无论是否正常关于hadoop - 如何将 PySpark worker 中的 numpy 数组保存到 HDFS 或共享文件系统?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33791535/
我是一名优秀的程序员,十分优秀!