gpt4 book ai didi

scala - 使用 scalding 读取多个文件并输出单个文件

转载 作者:可可西里 更新时间:2023-11-01 14:40:10 26 4
gpt4 key购买 nike

这些天我遇到了一个问题,我正在尝试使用 scalding 从多个文件中读取数据并使用单个文件创建输出。我的代码是这样的:

def getFilesSource (paths: Seq[String]) = {
new MultipleTextLineFiles(paths: _*) {
override protected def createHdfsReadTap(hdfsMode: Hdfs): Tap[JobConf, _, _] = {
val taps = goodHdfsPaths(hdfsMode).toList.map {
path => CastHfsTap (new Hfs (hdfsScheme, path, sinkMode))
}

taps.size match {
case 0 => {
CastHfsTap (new Hfs(hdfsScheme, hdfsPaths.head, sinkMode))
}
case 1 => taps.head
case _ => new ScaldingMultiSourceTap(taps)
}
}
}
}

但是当我运行这段代码时,它会将我的输出分成许多文件,但里面的数据非常少:只有几 K。相反,我希望能够将所有输出文件聚合到一个文件中。

我的烫金代码是:

val source = getFilesSource(mapped) // where mapped is a Sequence of valid HDFS paths (Seq [String])

TypedPipe.from(source).map(a => Try{
val json = JSON.parseObject(a)
(json.getInteger("prop1"), json.getInteger("prop2"), json.getBoolean("prop3"))
}.toOption).filter(a => a.nonEmpty)
.map(a => a.get)
.filter(a => !a._3)
.map (that => MyScaldingType (that._1, that._2))
.write(MyScaldingType.typedSink(typedArgs))

我想我必须重写 ScaldingMultiSourceTap 类型的“sourceConfInit”方法,但我不知道在里面写什么......

最佳答案

您可以使用 groupAll 将所有 map 输出(该作业是一个 map only 作业)发送到单个 reducer,考虑到数据很小,然后进行写入。输出将写入单个文件。

.
.
.
.filter(a => !a._3)
.map (that => MyScaldingType (that._1, that._2))
.groupAll
.write(MyScaldingType.typedSink(typedArgs))

关于scala - 使用 scalding 读取多个文件并输出单个文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41183061/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com