gpt4 book ai didi

scala - 如何在 Apache Spark 中读取包含多个文件的 zip

转载 作者:行者123 更新时间:2023-12-03 03:58:58 28 4
gpt4 key购买 nike

我有一个包含多个文本文件的压缩文件。我想读取每个文件并构建一个包含每个文件内容的 RDD 列表。

val test = sc.textFile("/Volumes/work/data/kaggle/dato/test/5.zip")

将只是整个文件,但如何迭代 zip 的每个内容,然后使用 Spark 将其保存在 RDD 中。

我对 Scala 或 Python 很满意。

Python 中使用 Spark 的可能解决方案 -

archive = zipfile.ZipFile(archive_path, 'r')
file_paths = zipfile.ZipFile.namelist(archive)
for file_path in file_paths:
urls = file_path.split("/")
urlId = urls[-1].split('_')[0]

最佳答案

Apache Spark 默认压缩支持

我已经在其他答案中写了所有必要的理论,您可能需要引用:https://stackoverflow.com/a/45958182/1549135

读取包含多个文件的 zip

我已遵循@Herman给出的建议并使用了ZipInputStream。这给了我这个解决方案,它返回 zip 内容的 RDD[String]

import java.io.{BufferedReader, InputStreamReader}
import java.util.zip.ZipInputStream
import org.apache.spark.SparkContext
import org.apache.spark.input.PortableDataStream
import org.apache.spark.rdd.RDD

implicit class ZipSparkContext(val sc: SparkContext) extends AnyVal {

def readFile(path: String,
minPartitions: Int = sc.defaultMinPartitions): RDD[String] = {

if (path.endsWith(".zip")) {
sc.binaryFiles(path, minPartitions)
.flatMap { case (name: String, content: PortableDataStream) =>
val zis = new ZipInputStream(content.open)
Stream.continually(zis.getNextEntry)
.takeWhile {
case null => zis.close(); false
case _ => true
}
.flatMap { _ =>
val br = new BufferedReader(new InputStreamReader(zis))
Stream.continually(br.readLine()).takeWhile(_ != null)
}
}
} else {
sc.textFile(path, minPartitions)
}
}
}

只需导入隐式类并调用 SparkContext 上的 readFile 方法即可使用它:

import com.github.atais.spark.Implicits.ZipSparkContext
sc.readFile(path)

关于scala - 如何在 Apache Spark 中读取包含多个文件的 zip,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32080475/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com