gpt4 book ai didi

scala - 使用 Scala/Apache Spark 对数据进行分组

转载 作者:行者123 更新时间:2023-12-02 03:29:31 26 4
gpt4 key购买 nike

下面的代码将字符串列表分组为类型 List[(String, List[String])]在长度为 5 的 String 中遇到所有大写的地方,这就是标识符,标识符后面的所有数据都被分组到一个列表中。每个组的终止因素是遇到空行。所以下面的“行”被转换为:

(IDENT,List(p1text, p2text))
(IDENY,List(p2text, p3text, p4text))

在 Scala/Spark 中是否有更惯用的方法来实现这一点? 可能使用带有谓词的 groupBy 调用?

理想情况下,数据结构的类型应该是 RDD[(String, List[String])] 而不是 List[(String, List[String])]

  val lines = List[String]("line1",
" ",
"line2",
" ",
" IDENT",
"p1text",
"p2text",
" ",
" IDENY",
"p2text",
"p3text",
"p4text",
" ",
"some text") //> lines : List[String] = List(line1, "
//|
//|
//| ", line2, "
//|
//|
//| ", " IDENT", p1text, p2text, "
//|
//|
//| ", " IDENY", p2text, p3text, p4text, "
//| ", some text)

def getItems(i: Int): List[String] = {
var iter = i;
val l = new scala.collection.mutable.ArrayBuffer[String]()
while (!lines(iter).trim.isEmpty) {
iter = iter + 1
if(!lines(iter).trim.isEmpty)
l.append(lines(iter).trim)
}
l.toList
} //> getItems: (i: Int)List[String]

val regex = "\\w{5}" //> regex : String = \w{5}

val u: List[(String , List[String])] = lines.zipWithIndex.map({
case (s, i) => {
if (s.trim.toUpperCase.matches(regex)) {
(s.trim, getItems(i))
} else {
("" , List())
}
}
}) //> u : List[(String, List[String])] = List((line1,List()), ("",List()), (line
//| 2,List()), ("",List()), (IDENT,List(p1text, p2text)), ("",List()), ("",List
//| ()), ("",List()), (IDENY,List(p2text, p3text, p4text)), ("",List()), ("",Li
//| st()), ("",List()), ("",List()), ("",List()))


val fi : List[(String, List[String])] = u.filterNot(f => f._2.isEmpty || f._2(0).trim.isEmpty)
//> fi : List[(String, List[String])] = List((IDENT,List(p1text, p2text)), (ID
//| ENY,List(p2text, p3text, p4text)))
fi.foreach(println) //> (IDENT,List(p1text, p2text))
//| (IDENY,List(p2text, p3text, p4text))

最佳答案

您可以从在 Scala 中编写拆分的惯用方法开始:作为递归函数。

def getItems(l: List[String]): List[(String, List[String])] = {
if (l.isEmpty) List()
else {
val caps = "[A-Z]+".r
val (beg, end) = l.span(_.trim.nonEmpty)
if (beg.nonEmpty)
beg.head.trim match {
case caps() => (beg.head.trim, beg.tail) :: getItems(end.drop(1))
case _ => getItems(end.drop(1))
}
else
getItems(end.tail)
}
}

然后你可以通过使它成为一个尾递归函数来加速它。

import scala.annotation.tailrec

def getItemsFast(l: List[String]): List[(String, List[String])] = {
@tailrec
def getItemsAux(l: List[String], res: List[(String, List[String])]): List[(String, List[String])] = {
if (l.isEmpty) res.reverse
else {
val caps = "[A-Z]+".r
val (beg, end) = l.span(_.trim.nonEmpty)
if (beg.nonEmpty)
beg.head.trim match {
case caps() => getItemsAux(end.drop(1), (beg.head.trim, beg.tail)::res)
case _ => getItemsAux(end.drop(1), res)
}
else
getItemsAux(end.tail, res)
}
}

getItemsAux(l,List())
}

然后,如果您有行的 RDD,在 Spark 中检索它的简单方法(但不正确,见下文)是在您的 RDD 上mapPartition

myRDDOfLines.mapPartitions(lines => {
getItemsFast(lines)
})

这应该大部分工作,但是这将无法注意到已分区的记录,例如标识符位于一个分区中,但一些“它的”行在下一个分区中尾随。

错误在于您将记录构建为可分区单元的方式:您真正想要的是记录的 RDD(一条记录是上面输出列表的一个元素,应该清楚键和值应该是什么) .这不是 sc.textFile 给你的。有很多方法可以更好地将数据加载到 Spark 中。例如,您可以:

  • 将您的文本拆分为多个文件中的空行并使用 wholeTextFiles
  • 实现自定义 TextInputFormatRecordReader
  • 如果您可以梳理出最少数量的空白字符用作记录分隔符,则可以使用 hadoop 对多行记录的支持,通过 hadoop.Configuration 对象提供分隔符。 ..

关于scala - 使用 Scala/Apache Spark 对数据进行分组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27963221/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com