gpt4 book ai didi

hadoop - fs.rename(newPath(raw FileName), in Path(process FileName)) 不工作

转载 作者:可可西里 更新时间:2023-11-01 14:30:19 29 4
gpt4 key购买 nike

我正在研究基于 Scala 的 Apache Spark 实现,用于将数据从远程位置加载到 HDFS,然后将数据从 HDFS 提取到 Hive 表。

使用我的第一个 spark 作业,我已将数据/文件载入 HDFS 中的某个位置 -

hdfs://sandbox.hortonworks.com:8020/data/analytics/raw/ folder

让我们考虑一下,在载入 CT_Click_Basic.csv 和 CT_Click_Basic1.csv.gz 文件后,我在 HDFS 中有以下文件 [共享位置的文件名将是此处的文件夹名称,其内容将出现在 part-xxxxx 文件中]:

[root@sandbox ~]# hdfs dfs -ls /data/analytics/raw/*/Found 3 items

-rw-r--r-- 3 chauhan.bhupesh hdfs 0 2017-07-27 15:02 /data/analytics/raw/CT_Click_Basic.csv/_SUCCESS

-rw-r--r-- 3 chauhan.bhupesh hdfs 8383 2017-07-27 15:02 /data/analytics/raw/CT_Click_Basic.csv/part-00000

-rw-r--r-- 3 chauhan.bhupesh hdfs 8395 2017-07-27 15:02 /data/analytics/raw/CT_Click_Basic.csv/part-00001

Found 2 items

-rw-r--r-- 3 chauhan.bhupesh hdfs 0 2017-07-27 15:02 /data/analytics/raw/CT_Click_Basic1.csv.gz/_SUCCESS

-rw-r--r-- 3 chauhan.bhupesh hdfs 16588 2017-07-27 15:02 /data/analytics/raw/CT_Click_Basic1.csv.gz/part-00000

现在使用我的另一个 Spark 作业,我想将这些文件从 /raw 文件夹移动到 /process,然后最后移动到 /archive HDFS 中基于每个阶段执行的任务的文件夹。

为此,我首先使用以下代码获取 /raw 文件夹下所有文件的列表:

    def listAllFilesFolderInDir(filePath:String,recursiveTraverse:Boolean,filePaths: ListBuffer[Path]) : ListBuffer[Path] = {
val files = GlobalContext.hdfs.listStatus(new Path(filePath))
files.foreach { fileStatus => {
if(!fileStatus.isDirectory()) {
filePaths+=fileStatus.getPath()
}
else {
listAllFilesFolderInDir(fileStatus.getPath().toString(), recursiveTraverse, filePaths)
}
}
}
filePaths
}

然后使用以下代码行,我尝试将/raw 文件夹中的文件重命名/移动到/process 文件夹:

var inputDir = "/data/analytics/raw"
var outputDir = "/data/analytics/process"
var filePaths = new ListBuffer[Path]()
var pathArray = listAllFilesFolderInDir(inputDir, true, filePaths)
val fs= <Getting hdfs FileSystem Instance Here>
for(path<-pathArray){
var pathSplit = path.toString().split("/")
var pathSplitSize = pathSplit.size
val rawFileName = inputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
val processFileName = outputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
fs.rename(new Path(rawFileName), new Path(processFileName))
}

但我无法使用上面编写的代码移动/重命名这些文件。我尝试调试代码,发现 fs.rename() 返回“false”。

Please Note: I am able to achieve the file renaming/movement when I copy any file manually in /data/analytics/raw folder ex CT.csv [or any other file]and then running fs.rename() but it is not working for Part-xxxxx files.

有什么我想念的吗?

任何快速帮助将不胜感激。

问候,布佩什

最佳答案

终于找到问题了。实际上,我试图将文件从/data/analytics/raw/folder.csv/part-xxxxx 重命名为/data/analytics/process/folder.csv/part-xxxxx,其中/data/analytics/process 存在于 HDFS 中,但是“folder.csv”不存在;因此它在重命名时返回 false。我在我的代码中添加了以下行并且对我来说工作得很好

var inputDir = "/data/analytics/raw"
var outputDir = "/data/analytics/process"
var filePaths = new ListBuffer[Path]()
var pathArray = listAllFilesFolderInDir(inputDir, true, filePaths)
val fs= <Getting hdfs FileSystem Instance Here>
for(path<-pathArray){
var pathSplit = path.toString().split("/")
var pathSplitSize = pathSplit.size

val rawFileName = inputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)

var processFolderName = outputDir + "/" + pathSplit(pathSplitSize-2)
var processFolderPath = new Path(processFolderName)
if(!(fs.exists(processFolderPath)))
fs.mkdirs(processFolderPath)
val processFileName = processFolderName + "/" + pathSplit(pathSplitSize-1)
fs.rename(new Path(rawFileName), new Path(processFileName))
}

关于hadoop - fs.rename(newPath(raw FileName), in Path(process FileName)) 不工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45459096/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com