gpt4 book ai didi

hadoop - Spark - 如何在 HDFS 中重组目录

转载 作者:可可西里 更新时间:2023-11-01 16:37:27 26 4
gpt4 key购买 nike

我有一个结构如下的目录:

temp/Tweets/userId123/Tweets.csv
temp/Tweets/userId456/Tweets.csv
temp/Tweets/userId789/Tweets.csv

temp/Mentions/userId123/Mentions.csv
temp/Mentions/userId456/Mentions.csv
temp/Mentions/userId789/Mentions.csv

.
.
.

数据由数据实体的类型构成,我想由用户对其进行重组,如下所示:

final/userId123/Tweets.csv
final/userId123/Mentions.csv
.
.

final/userId456/Tweets.csv
final/userId456/Mentions.csv
.
.

我一直在 google/StackOverflow/Spark 文档上四处寻找,但还没有找到执行此操作的方法,但我认为应该有一种修改目录结构的方法。我该怎么做?

最佳答案

您可以使用来自 Scala(或 Python 或 Java - 这里我将使用 Scala)的 hadoop.fs.FileSystem API:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

首先,让我们定义一种列出 userids hdfs 文件夹的方法:

def listFolderNamesInFolder(hdfsPath: String): List[String] =
FileSystem
.get(new Configuration())
.listStatus(new Path(hdfsPath))
.flatMap(status => if (!status.isFile) Some(status.getPath.getName) else None)
.toList

然后让我们定义两个助手来移动 hdfs 文件和创建 hdfs 文件夹:

def moveFile(oldPath: String, newPath: String): Unit = {
val fileSystem = FileSystem.get(new Configuration())
fileSystem.rename(new Path(oldPath), new Path(newPath))
}

def createFolder(hdfsPath: String): Unit =
FileSystem.get(new Configuration()).mkdirs(new Path(hdfsPath))

最后,让我们遍历每个用户 ID 文件夹并将每个推文、提及文件移动到关联的最终文件夹:

def moveTweetFiles(hdfsPath: String): Unit =
listFolderNamesInFolder(s"$hdfsPath/temp/Tweets").foreach {
case userid =>
createFolder(s"$hdfsPath/final/$userid")
moveFile(
s"$hdfsPath/temp/Tweets/$userid/Tweets.csv",
s"$hdfsPath/final/$userid/Tweets.csv")
}

def moveMentionsFiles(hdfsPath: String): Unit =
listFolderNamesInFolder(s"$hdfsPath/temp/Mentions").foreach {
case userid =>
createFolder(s"$hdfsPath/final/$userid")
moveFile(
s"$hdfsPath/temp/Mentions/$userid/Mentions.csv",
s"$hdfsPath/final/$userid/Mentions.csv")
}

如果您的 hdfs 根文件夹(包含临时文件夹和最终文件夹的文件夹)是“src/test/resources”(我用来测试的文件夹):

moveTweetFiles("src/test/resources")
moveMentionsFiles("src/test/resources")

顺便说一句:FileSystem 已经嵌入在 Spark 依赖项中(无需添加额外的依赖项)。

这可以作为 Spark 作业 (spark-submit) 启动,即使我们不使用任何 Spark 管道;或者可能只是来自 spark-shell。

关于hadoop - Spark - 如何在 HDFS 中重组目录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49180129/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com