gpt4 book ai didi

python : How to import list of files in directory from HDFS

转载 作者:行者123 更新时间:2023-12-01 08:25:10 27 4
gpt4 key购买 nike

我尝试在 python 中从 HDFS 导入文件列表。

如何从 HDFS 执行此操作:

path =r'/my_path'
allFiles = glob.glob(path + "/*.csv")

df_list = []
for file_ in allFiles:
df = pd.read_csv(file_,index_col=None, header=0,sep=';')
df_list.append(df)

我认为subprocess.Popen可以解决问题,但如何仅提取文件名?

import subprocess
p = subprocess.Popen("hdfs dfs -ls /my_path/ ",
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)


for line in p.stdout.readlines():
print(line)

输出如下:

b'Found 32 items\n'
b'-rw------- 3 user hdfs 42202621 2019-01-21 10:05 /my_path/file1.csv\n'
b'-rw------- 3 user hdfs 99320020 2019-01-21 10:05 /my_path/file2.csv\n'

最佳答案

声明:这将是一个漫长而乏味的过程。但考虑到这种情况,我会尽力使其尽可能通用和可重复。

<小时/>

考虑到不需要外部库(除了pandas?),没有必要做出选择。我建议尽可能多地利用 WebHDFS

据我所知,默认情况下,HDFS 的安装包括 WebHDFS 的安装。以下解决方案严重依赖于WebHDFS

第一步

首先,您必须了解WebHDFS url。 WebHDFS安装在HDFS Namenode上,默认端口为50070

因此,我们从 http://[namenode_ip]:50070/webhdfs/v1/ 开始,其中 /webhdfs/v1/是所有人的通用 URL。

为了举例,我们假设它为 http://192.168.10.1:50070/web/hdfs/v1

第二步

通常,可以使用curl来列出HDFS目录的内容。详细解释参见WebHDFS REST API: List a Directory

如果您要使用curl,以下提供给定目录内所有文件的FileStatuses

curl "http://192.168.10.1:50070/webhdfs/v1/<PATH>?op=LISTSTATUS"
^^^^^^^^^^^^ ^^^^^ ^^^^ ^^^^^^^^^^^^^
Namenode IP Port Path Operation

如上所述,这会返回 JSON 对象中的 FileStatuses:

{
"FileStatuses":
{
"FileStatus":
[
{
"accessTime" : 1320171722771,
"blockSize" : 33554432,
"group" : "supergroup",
"length" : 24930,
"modificationTime": 1320171722771,
"owner" : "webuser",
"pathSuffix" : "a.patch",
"permission" : "644",
"replication" : 1,
"type" : "FILE"
},
{
"accessTime" : 0,
"blockSize" : 0,
"group" : "supergroup",
"length" : 0,
"modificationTime": 1320895981256,
"owner" : "szetszwo",
"pathSuffix" : "bar",
"permission" : "711",
"replication" : 0,
"type" : "DIRECTORY"
},
...
]
}
}

使用 python 的默认库可以实现相同的结果:

import requests

my_path = '/my_path/'
curl = requests.get('http://192.168.10.1:50070/webhdfs/v1/%s?op=LISTSTATUS' % my_path)

如上所示,每个文件的实际状态比结果 JSON 低两级。换句话说,获取每个文件的FileStatus:

curl.json()['FileStatuses']['FileStatus'] 

[
{
"accessTime" : 1320171722771,
"blockSize" : 33554432,
"group" : "supergroup",
"length" : 24930,
"modificationTime": 1320171722771,
"owner" : "webuser",
"pathSuffix" : "a.patch",
"permission" : "644",
"replication" : 1,
"type" : "FILE"
},
{
"accessTime" : 0,
"blockSize" : 0,
"group" : "supergroup",
"length" : 0,
"modificationTime": 1320895981256,
"owner" : "szetszwo",
"pathSuffix" : "bar",
"permission" : "711",
"replication" : 0,
"type" : "DIRECTORY"
},
...
]

第三步

既然您现在已经拥有了所需的所有信息,那么您所需要做的就是解析。

import os

file_paths = []
for file_status in curl.json()['FileStatuses']['FileStatus']:
file_name = file_status['pathSuffix']
# this is the file name in the queried directory
if file_name.endswith('.csv'):
# if statement is only required if the directory contains unwanted files (i.e. non-csvs).
file_paths.append(os.path.join(path, file_name))
# os.path.join asserts your result consists of absolute path

file_paths
['/my_path/file1.csv',
'/my_path/file2.csv',
...]

最后一步

现在您知道了文件和 WebHDFS 链接的路径,pandas.read_csv 可以处理其余的工作。

import pandas as pd

dfs = []
web_url = "http://192.168.10.1:50070/webhdfs/v1/%s?op=OPEN"
# ^^^^^^^
# Operation is now OPEN
for file_path in file_paths:
file_url = web_url % file_path
# http://192.168.10.1:50070/webhdfs/v1/my_path/file1.csv?op=OPEN
dfs.append(pd.read_csv(file_url))

这样您就可以导入所有 .csv 并将其分配给 dfs

警告

如果您的 HDFS 配置为 HA(高可用性),则会有多个 namenode,因此您的 namenode_ip 必须进行相应设置:必须是主节点的IP。

关于 python : How to import list of files in directory from HDFS,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54306613/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com