gpt4 book ai didi

pyspark - 从 hdfs 目录迭代 pyspark 中的文件

转载 作者:行者123 更新时间:2023-12-04 14:18:26 41 4
gpt4 key购买 nike

我在 hdfs 目录中有文件列表,我想从 hdfs 目录遍历 pyspark 中的文件,并将每个文件存储在一个变量中,并使用该变量进行进一步处理。我在下面收到错误..

py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.api.python.PythonUtils.toSeq. Trace: 
py4j.Py4JException: Method toSeq([class org.apache.hadoop.fs.Path]) does not exist

InputDir = "/Data/Ready/ARRAY_COUNTERS" # 输入 hdfs 目录。

hadoop = sc._jvm.org.apache.hadoop
fs = hadoop.fs.FileSystem
conf = hadoop.conf.Configuration()
path = hadoop.fs.Path(InputDir)

for f in fs.get(conf).listStatus(path):
Filename = f.getPath()

df = spark.read.csv(Filename,header=True)
#I am getting above error in while reading this file.

最佳答案

关于这两行:

    Filename =  f.getPath()

df = spark.read.csv(Filename,header=True)

getPath()不是字符串。此外 - f 也可以是目录,因此要确保您没有尝试加载目录,您可以在 f.isFile() 上添加验证:

if(f.isFile()):
Filename = f.getPath()
df = spark.read.csv(str(Filename),header=True)

现在对我有用的替代方案是:

if(f.isFile()):
Filename = f.getPath()
df = sc.textFile(str(Filename), 500).map(lambda x: x.split(", ")) #or any other spearator, returns RDD
headers=df.first() # to infer schema - you can then convert it to pyspark dataframe with specific column types

关于pyspark - 从 hdfs 目录迭代 pyspark 中的文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57859371/

41 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com