- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
我正在使用 Spark Dataframe API 从 NFS 共享加载/读取文件,然后将该文件的数据保存/写入 HDFS。
我有一个包含一个主节点和两个工作节点的三节点 Spark 集群。我的 Spark 集群使用 YARN 作为集群管理器,因此两个工作节点是 YARN NodeManager 节点,主节点是 Yarn ResourceManager 节点。
我有一个远程位置,比如/data/files,它安装到所有三个 YARN/SPARK 节点,因为它是 [/data/files],其中存在我想要读取的所有 csv 文件 [多个]从并最终写入 HDFS。
我在我的主节点上运行以下代码
import java.io.File
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SQLContext
object TestMoreThan1CSV2DF {
private val source: String = "file:///data/files/"
private val destination = "hdfs://<myHostIP>:8020/raw/"
private val fileFormat : String = "com.databricks.spark.csv"
def main(args:Array[String]):Unit={
val conf = new SparkConf().setAppName("TestMoreThan1CSV2DF").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val fileArray: Array[File] = new java.io.File(source).listFiles.filter(_.getName.endsWith(".csv"))
for(file<-fileArray){
// reading csv file from shared location and taking whole data in a dataframe
var df = loadCSV2DF(sqlContext, fileFormat, "true", "true", file.getName)
// variable for holding destination location : HDFS Location
var finalDestination: String = destination+file.getName
// saving data into HDFS
writeDF2HDFS(df,fileFormat,"true",finalDestination) /// saved using default number of partition = 1
}
}
def loadCSV2DF(sqlContext : SQLContext, fileFormat: String, header : String, inferSchema: String, source: String) : DataFrame = {
try{
sqlContext.read.format(fileFormat)
.option("header", header) // Use first line of all files as header
.option("inferSchema", inferSchema) // Automatically infer data types
.load(source)
}
catch{
case ex: OnboardingException => {
throw ex;
}
}
}
def writeDF2HDFS(df: DataFrame, fileFormat: String, header: String, destination: String, partitions: Integer = 1){
try{
df.repartition(partitions).write.format(fileFormat).option("header",header).save(destination)
}
catch{
Case ez : OnboardingException => {
throw ez;
}
}
}
}
This code reads all the csv files present at shared location /data/files/ and write each one of them to HDFS. Ex: /data/files/f1.csv will get loaded into HDFS as /raw/f1.csv/part-xxxxx file
在运行这段代码时,我无法弄清楚:
1) Where this whole code is running? Is it running on driver? or using both workers?
2) Does load() and save() API runs on worker nodes, does it work in parallel? If yes then how does two workers keeps track of the portion of while which it has read or written?
3) As of now I am reading each file sequentially in "for" loop and working on each one of them sequentially, is it possible to make it a multi threaded application, where each file is allocated to one thread for performing end to end read and write in parallel. Will disk IO be any constraint while doing this?
任何快速响应/引用/指针将不胜感激。
问候,布佩什
最佳答案
为我的查询从另一个线程复制的非常好的解释: differentiate driver code and work code in Apache Spark
这里也复制其中的一部分:转换创建的闭包内发生的所有事情都发生在 worker 上。这意味着如果在 map(...)、filter(...)、mapPartitions(...)、groupBy*(...) 内部传递某些内容,则在 worker 上执行 aggregateBy*(...)。它包括从持久存储或远程源读取数据。
count、reduce(...)、fold(...) 等操作通常在驱动程序和工作程序上执行。繁重的工作由工作人员并行执行,一些最终步骤(例如减少从工作人员收到的输出)在驱动程序上按顺序执行。
其他一切,比如触发 Action 或转换,都发生在驱动程序上。特别是,它表示需要访问 SparkContext 的每个操作。
就我的查询而言:1) 是的,main() 方法的一部分在驱动程序上运行,但转换发生在
2) load() 和 save() 在 worker 上运行,因为我们可以看到加载创建数据帧 [存储在分区的内存中] 并保存在 hdfs 中创建 part-xxxx 文件,这表明 worker 正在这样做
3) 仍在努力实现这一点,一旦完成就会回答这个问题。
谢谢
关于hadoop - sqlContext.read...load() 和 sqlContext.write...save() 代码在 Spark Cluster 上运行在哪里?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45027385/
我尝试理解[c代码 -> 汇编]代码 void node::Check( data & _data1, vector& _data2) { -> push ebp -> mov ebp,esp ->
我需要在当前表单(代码)的上下文中运行文本文件中的代码。其中一项要求是让代码创建新控件并将其添加到当前窗体。 例如,在Form1.cs中: using System.Windows.Forms; ..
我有此 C++ 代码并将其转换为 C# (.net Framework 4) 代码。有没有人给我一些关于 malloc、free 和 sprintf 方法的提示? int monate = ee; d
我的网络服务器代码有问题 #include #include #include #include #include #include #include int
给定以下 html 代码,将列表中的第三个元素(即“美丽”一词)以斜体显示的 CSS 代码是什么?当然,我可以给这个元素一个 id 或一个 class,但 html 代码必须保持不变。谢谢
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 我们不允许提问寻求书籍、工具、软件库等的推荐。您可以编辑问题,以便用事实和引用来回答。 关闭 7 年前。
我试图制作一个宏来避免重复代码和注释。 我试过这个: #define GrowOnPage(any Page, any Component) Component.Width := Page.Surfa
我正在尝试将我的旧 C++ 代码“翻译”成头条新闻所暗示的 C# 代码。问题是我是 C# 中的新手,并不是所有的东西都像 C++ 中那样。在 C++ 中这些解决方案运行良好,但在 C# 中只是不能。我
在 Windows 10 上工作,R 语言的格式化程序似乎没有在 Visual Studio Code 中完成它的工作。我试过R support for Visual Studio Code和 R-T
我正在处理一些报告(计数),我必须获取不同参数的计数。非常简单但乏味。 一个参数的示例查询: qCountsEmployee = ( "select count(*) from %s wher
最近几天我尝试从 d00m 调试网络错误。我开始用尽想法/线索,我希望其他 SO 用户拥有可能有用的宝贵经验。我希望能够提供所有相关信息,但我个人无法控制服务器环境。 整个事情始于用户注意到我们应用程
我有一个 app.js 文件,其中包含如下 dojo amd 模式代码: require(["dojo/dom", ..], function(dom){ dom.byId('someId').i
我对“-gencode”语句中的“code=sm_X”选项有点困惑。 一个例子:NVCC 编译器选项有什么作用 -gencode arch=compute_13,code=sm_13 嵌入库中? 只有
我为我的表格使用 X-editable 框架。 但是我有一些问题。 $(document).ready(function() { $('.access').editable({
我一直在通过本教程学习 flask/python http://blog.miguelgrinberg.com/post/the-flask-mega-tutorial-part-i-hello-wo
我想将 Vim 和 EMACS 用于 CNC、G 代码和 M 代码。 Vim 或 EMACS 是否有任何语法或模式来处理这种类型的代码? 最佳答案 一些快速搜索使我找到了 this vim 和 thi
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 想改进这个问题?更新问题,使其成为 on-topic对于堆栈溢出。 7年前关闭。 Improve this
这个问题在这里已经有了答案: Enabling markdown highlighting in Vim (5 个回答) 6年前关闭。 当我在 Vim 中编辑包含 Markdown 代码的 READM
我正在 Swift3 iOS 中开发视频应用程序。基本上我必须将视频 Assets 和音频与淡入淡出效果合并为一个并将其保存到 iPhone 画廊。为此,我使用以下方法: private func d
pipeline { agent any stages { stage('Build') { steps { e
我是一名优秀的程序员,十分优秀!