gpt4 book ai didi

scala - 如何从 spark 执行器读取 HDFS 文件?

转载 作者:行者123 更新时间:2023-12-02 18:37:46 26 4
gpt4 key购买 nike

我有一个大的(> 500m 行)CSV 文件。此 CSV 文件中的每一行都包含一个指向位于 HDFS 上的二进制文件的路径。我想使用 Spark 读取每个文件、处理它们并将结果写入另一个 CSV 文件或表格。

在驱动程序中这样做很简单,下面的代码就可以完成工作

val hdfsFilePathList = // read paths from CSV, collect into list

hdfsFilePathList.map( pathToHdfsFile => {
sqlContext.sparkContext.binaryFiles(pathToHdfsFile).mapPartitions {
functionToProcessBinaryFiles(_)
}
})

这个的主要问题是驱动程序做了太多的工作。我想将 binaryFiles 完成的工作外包给执行者。我发现了一些我认为可以让我从执行者访问 sparkContext 的有前途的例子:

Use SparkContext hadoop configuration within RDD methods/closures, like foreachPartition

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/SerializableConfiguration.scala

但它们似乎并没有像我想象的那样工作。我希望以下内容起作用:

import java.io.{ObjectInputStream, ObjectOutputStream}
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.conf.Configuration

class ConfigSerDeser(var conf: Configuration) extends Serializable {

def this() {
this(new Configuration())
}

def get(): Configuration = conf

private def writeObject (out: java.io.ObjectOutputStream): Unit = {
conf.write(out)
}

private def readObject (in: java.io.ObjectInputStream): Unit = {
conf = new Configuration()
conf.readFields(in)
}

private def readObjectNoData(): Unit = {
conf = new Configuration()
}
}

val serConf = new ConfigSerDeser(sc.hadoopConfiguration)

val mappedIn = inputDf.map( row => {
serConf.get()
})

但它失败了 KryoException: java.util.ConcurrentModificationException

是否可以让执行者直接访问 HDFS 文件或 HDFS 文件系统?或者,是否有一种有效的方法来读取 HDFS/S3 上的数百万个二进制文件并使用 Spark 处理它们?

最佳答案

有一个类似的用例,我试图做同样的事情,但意识到SparkSession 或 SparkContext 不可序列化,因此无法从执行程序访问。

关于scala - 如何从 spark 执行器读取 HDFS 文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55458704/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com