gpt4 book ai didi

scala - 使用 scala 在 Spark 应用程序中构建倒排索引

转载 作者:行者123 更新时间:2023-12-03 04:34:23 24 4
gpt4 key购买 nike

我是 Spark 和 scala 编程语言的新手。我的输入是 CSV 文件。我需要在 csv 文件中的值上构建一个倒排索引,如下例所示。

Input: file.csv

attr1, attr2, attr3
1, AAA, 23
2, BBB, 23
3, AAA, 27

output format: value -> (rowid, collumnid) pairs
for example: AAA -> ((1,2),(3,2))
27 -> (3,3)

我从以下代码开始。之后我就被困住了。请帮忙。

object Main {
def main(args: Array[String]) {

val conf = new SparkConf().setAppName("Invert Me!").setMaster("local[2]")
val sc = new SparkContext(conf)

val txtFilePath = "/home/person/Desktop/sample.csv"

val txtFile = sc.textFile(txtFilePath)
val nRows = txtFile.count()

val data = txtFile.map(line => line.split(",").map(elem => elem.trim()))
val nCols = data.collect()(0).length

}
}

最佳答案

保留你的风格的代码可能看起来像

val header = sc.broadcast(data.first())

val cells = data.zipWithIndex().filter(_._2 > 0).flatMap { case (row, index) =>
row.zip(header.value).map { case (value, column) => value ->(column, index) }
}


val index: RDD[(String, Vector[(String, Long)])] =
cells.aggregateByKey(Vector.empty[(String, Long)])(_ :+ _, _ ++ _)

此处的index值应包含所需的CellValue(ColumnName, RowIndex)对的映射

上述方法中的下划线只是缩写的 lambda,它可以用另一种方式编写为

val cellsVerbose = data.zipWithIndex().flatMap {
case (row, 1) => IndexedSeq.empty // skipping header row
case (row, index) => row.zip(header.value).map {
case (value, column) => value ->(column, index)
}
}


val indexVerbose: RDD[(String, Vector[(String, Long)])] =
cellsVerbose.aggregateByKey(zeroValue = Vector.empty[(String, Long)])(
seqOp = (keys, key) => keys :+ key,
combOp = (keysA, keysB) => keysA ++ keysB)

关于scala - 使用 scala 在 Spark 应用程序中构建倒排索引,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34013606/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com