- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
我正在使用 Spark Dataframe API 从 NFS 共享加载/读取文件,然后将该文件的数据保存/写入 HDFS。
我有一个包含一个主节点和两个工作节点的三节点 Spark 集群。我的 Spark 集群使用 YARN 作为集群管理器,因此两个工作节点是 YARN NodeManager 节点,主节点是 Yarn ResourceManager 节点。
我有一个远程位置,比如/data/files,它安装到所有三个 YARN/SPARK 节点,因为它是 [/data/files],其中存在我想要读取的所有 csv 文件 [多个]从并最终写入 HDFS。
我在我的主节点上运行以下代码
import java.io.File
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SQLContext
object TestMoreThan1CSV2DF {
private val source: String = "file:///data/files/"
private val destination = "hdfs://<myHostIP>:8020/raw/"
private val fileFormat : String = "com.databricks.spark.csv"
def main(args:Array[String]):Unit={
val conf = new SparkConf().setAppName("TestMoreThan1CSV2DF").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val fileArray: Array[File] = new java.io.File(source).listFiles.filter(_.getName.endsWith(".csv"))
for(file<-fileArray){
// reading csv file from shared location and taking whole data in a dataframe
var df = loadCSV2DF(sqlContext, fileFormat, "true", "true", file.getName)
// variable for holding destination location : HDFS Location
var finalDestination: String = destination+file.getName
// saving data into HDFS
writeDF2HDFS(df,fileFormat,"true",finalDestination) /// saved using default number of partition = 1
}
}
def loadCSV2DF(sqlContext : SQLContext, fileFormat: String, header : String, inferSchema: String, source: String) : DataFrame = {
try{
sqlContext.read.format(fileFormat)
.option("header", header) // Use first line of all files as header
.option("inferSchema", inferSchema) // Automatically infer data types
.load(source)
}
catch{
case ex: OnboardingException => {
throw ex;
}
}
}
def writeDF2HDFS(df: DataFrame, fileFormat: String, header: String, destination: String, partitions: Integer = 1){
try{
df.repartition(partitions).write.format(fileFormat).option("header",header).save(destination)
}
catch{
Case ez : OnboardingException => {
throw ez;
}
}
}
}
This code reads all the csv files present at shared location /data/files/ and write each one of them to HDFS. Ex: /data/files/f1.csv will get loaded into HDFS as /raw/f1.csv/part-xxxxx file
在运行这段代码时,我无法弄清楚:
1) Where this whole code is running? Is it running on driver? or using both workers?
2) Does load() and save() API runs on worker nodes, does it work in parallel? If yes then how does two workers keeps track of the portion of while which it has read or written?
3) As of now I am reading each file sequentially in "for" loop and working on each one of them sequentially, is it possible to make it a multi threaded application, where each file is allocated to one thread for performing end to end read and write in parallel. Will disk IO be any constraint while doing this?
任何快速响应/引用/指针将不胜感激。
问候,布佩什
最佳答案
为我的查询从另一个线程复制的非常好的解释: differentiate driver code and work code in Apache Spark
这里也复制其中的一部分:转换创建的闭包内发生的所有事情都发生在 worker 上。这意味着如果在 map(...)、filter(...)、mapPartitions(...)、groupBy*(...) 内部传递某些内容,则在 worker 上执行 aggregateBy*(...)。它包括从持久存储或远程源读取数据。
count、reduce(...)、fold(...) 等操作通常在驱动程序和工作程序上执行。繁重的工作由工作人员并行执行,一些最终步骤(例如减少从工作人员收到的输出)在驱动程序上按顺序执行。
其他一切,比如触发 Action 或转换,都发生在驱动程序上。特别是,它表示需要访问 SparkContext 的每个操作。
就我的查询而言:1) 是的,main() 方法的一部分在驱动程序上运行,但转换发生在
2) load() 和 save() 在 worker 上运行,因为我们可以看到加载创建数据帧 [存储在分区的内存中] 并保存在 hdfs 中创建 part-xxxx 文件,这表明 worker 正在这样做
3) 仍在努力实现这一点,一旦完成就会回答这个问题。
谢谢
关于hadoop - sqlContext.read...load() 和 sqlContext.write...save() 代码在 Spark Cluster 上运行在哪里?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45027385/
目前正在学习 Spark 的类(class)并了解到执行者的定义: Each executor will hold a chunk of the data to be processed. Thisc
阅读了有关 http://spark.apache.org/docs/0.8.0/cluster-overview.html 的一些文档后,我有一些问题想要澄清。 以 Spark 为例: JavaSp
Spark核心中的调度器与以下Spark Stack(来自Learning Spark:Lightning-Fast Big Data Analysis一书)中的Standalone Schedule
我想在 spark-submit 或 start 处设置 spark.eventLog.enabled 和 spark.eventLog.dir -all level -- 不要求在 scala/ja
我有来自 SQL Server 的数据,需要在 Apache Spark (Databricks) 中进行操作。 在 SQL Server 中,此表的三个键列使用区分大小写的 COLLATION 选项
所有这些有什么区别和用途? spark.local.ip spark.driver.host spark.driver.bind地址 spark.driver.hostname 如何将机器修复为 Sp
我有大约 10 个 Spark 作业,每个作业都会进行一些转换并将数据加载到数据库中。必须为每个作业单独打开和关闭 Spark session ,每次初始化都会耗费时间。 是否可以只创建一次 Spar
/Downloads/spark-3.0.1-bin-hadoop2.7/bin$ ./spark-shell 20/09/23 10:58:45 WARN Utils: Your hostname,
我是 Spark 的完全新手,并且刚刚开始对此进行更多探索。我选择了更长的路径,不使用任何 CDH 发行版安装 hadoop,并且我从 Apache 网站安装了 Hadoop 并自己设置配置文件以了解
TL; 博士 Spark UI 显示的内核和内存数量与我在使用 spark-submit 时要求的数量不同 更多细节: 我在独立模式下运行 Spark 1.6。 当我运行 spark-submit 时
spark-submit 上的文档说明如下: The spark-submit script in Spark’s bin directory is used to launch applicatio
关闭。这个问题是opinion-based .它目前不接受答案。 想改善这个问题吗?更新问题,以便可以通过 editing this post 用事实和引文回答问题. 6 个月前关闭。 Improve
我想了解接收器如何在 Spark Streaming 中工作。根据我的理解,将有一个接收器任务在执行器中运行,用于收集数据并保存为 RDD。当调用 start() 时,接收器开始读取。需要澄清以下内容
有没有办法在不同线程中使用相同的 spark 上下文并行运行多个 spark 作业? 我尝试使用 Vertx 3,但看起来每个作业都在排队并按顺序启动。 如何让它在相同的 spark 上下文中同时运行
我们有一个 Spark 流应用程序,这是一项长期运行的任务。事件日志指向 hdfs 位置 hdfs://spark-history,当我们开始流式传输应用程序时正在其中创建 application_X
我们正在尝试找到一种加载 Spark (2.x) ML 训练模型的方法,以便根据请求(通过 REST 接口(interface))我们可以查询它并获得预测,例如http://predictor.com
Spark newb 问题:我在 spark-sql 中进行完全相同的 Spark SQL 查询并在 spark-shell . spark-shell版本大约需要 10 秒,而 spark-sql版
我正在使用 Spark 流。根据 Spark 编程指南(参见 http://spark.apache.org/docs/latest/programming-guide.html#accumulato
我正在使用 CDH 5.2。我可以使用 spark-shell 运行命令。 如何运行包含spark命令的文件(file.spark)。 有没有办法在不使用 sbt 的情况下在 CDH 5.2 中运行/
我使用 Elasticsearch 已经有一段时间了,但使用 Cassandra 的经验很少。 现在,我有一个项目想要使用 Spark 来处理数据,但我需要决定是否应该使用 Cassandra 还是
我是一名优秀的程序员,十分优秀!