gpt4 book ai didi

json - 将 JSON 文件读入 Spark 数据集并从单独的 Map 添加列

转载 作者:行者123 更新时间:2023-12-04 07:50:04 27 4
gpt4 key购买 nike

Spark 2.1 和 Scala 2.11 在这里。我有一个大 Map[String,Date]其中有 10K 个键/值对。我还有 10K JSON 文件存在于 Spark 可访问的文件系统上:

mnt/
some/
path/
data00001.json
data00002.json
data00003.json
...
data10000.json

映射中的每个 KV 对对应于其各自的 JSON 文件(因此第一个映射 KV 对对应于 data00001.json 等)

我想将所有这些 JSON 文件读入 1 个大型 Spark Dataset并且,当我在做的时候,向这个数据集添加两个新列(JSON 文件中不存在)。每个映射键将是第一个新列的值,每个键的值将是第二个新列的值:
val objectSummaries = getScalaList()
val dataFiles = objectSummaries.filter { _.getKey.endsWith("data.json") }
val dataDirectories = dataFiles.map(dataFile => {
val keyComponents = dataFile.getKey.split("/")
val parent = if (keyComponents.length > 1) keyComponents(keyComponents.length - 2) else "/"
(parent, dataFile.getLastModified)
})

// TODO: How to take each KV pair from dataDirectories above and store them as the values for the
// two new columns?
val allDataDataset = spark.read.json("mnt/some/path/*.json")
.withColumn("new_col_1", dataDirectories._1)
.withColumn("new_col_2", dataDirectories._2)

我已经确认,当我删除 mnt/some/path/*.json 时,Spark 将遵守通配符( withColumn )并将所有 JSON 文件读入单个数据集。方法并做一个 allData.show() .所以我在那里一切都很好。

我正在挣扎的是: 如何添加两个新列,然后正确提取所有键/值映射元素?

最佳答案

如果我理解正确,您想将 map 中的 KV 与 json 文件中的数据帧相关联。

我将尝试将问题简化为仅排序的 3 个文件和 3 个键值。

val kvs = Map("a" -> 1, "b" -> 2, "c" -> 3)
val files = List("data0001.json", "data0002.json", "data0003.json")

定义一个 case 类来处理更简单的文件、键、值
case class FileWithKV(fileName: String, key: String, value: Int)

将压缩文件和 kvs
val filesWithKVs = files.zip(kvs)
.map(p => FileWithKV(p._1, p._2._1, p._2._2))

它看起来像这样
filesWithKVs: List[FileWithKV] = List(FileWithKV(data0001.json,a,1), FileWithKV(data0002.json,b,2), FileWithKV(data0003.json,c,3))

然后我们从一个初始数据框开始,从我们集合的头部开始,然后将开始向左折叠以构建将保存所有文件的整个数据框,所有列都是从 KV 动态生成的
val head = filesWithKVs.head
val initialDf = spark
.read.json(head.filename)
.withColumn(s"new_col_1", lit(head.key))
.withColumn(s"new_col_2", lit(head.value))

现在折叠部分
val dfAll = filesWithKVs.tail.foldLeft(initialDf)((df, fileWithKV) => {
val newDf = spark
.read.json(fileWithKV.filename)
.withColumn(s"new_col_1", lit(fileWithKV.key))
.withColumn(s"new_col_2", lit(fileWithKV.value))
// union the dataframes to capture file by file, key value with key value
df.union(newDf)
})

数据框将如下所示,假设在 json 文件中将有一个名为 bar 的列和一个值 foo,对于 3 个 json 文件中的每一个
+---+----------+----------+
|bar|new_col_1 |new_col_2 |
+---+----------+----------+
|foo| a| 1|
|foo| b| 2|
|foo| c| 3|
+---+----------+----------+

关于json - 将 JSON 文件读入 Spark 数据集并从单独的 Map 添加列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45445011/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com