- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
关闭。这个问题是opinion-based .它目前不接受答案。
想改善这个问题吗?更新问题,以便可以通过 editing this post 用事实和引文回答问题.
3年前关闭。
Improve this question
因此,在面向对象的世界中度过了多年并始终考虑代码重用、设计模式和最佳实践,我发现自己在 Spark 世界中的代码组织和代码重用方面有些挣扎。
如果我尝试以可重用的方式编写代码,它几乎总是伴随着性能成本,我最终将其重写为最适合我的特定用例的任何代码。这种“为这个特定用例编写最佳内容”的常量也会影响代码组织,因为当“它们真的属于一起”时,将代码拆分为不同的对象或模块是很困难的,因此我最终得到的“上帝”对象很少包含长复杂的转换链。事实上,我经常认为,如果我在面向对象的世界中工作时查看了我现在编写的大部分 Spark 代码,我会畏缩并认为它是“意大利面条式代码”。
我在网上冲浪,试图找到某种与面向对象世界的最佳实践等效的方法,但运气不佳。我可以找到一些函数式编程的“最佳实践”,但 Spark 只是增加了一个额外的层,因为性能在这里是一个主要因素。
所以我要问你的问题是,你们中的任何一位 Spark 大师是否找到了一些可以推荐的编写 Spark 代码的最佳实践?
编辑
正如评论中所写,我实际上并不期望有人发布有关如何解决此问题的答案,而是希望这个社区中的某个人遇到了某种 Martin Fowler 类型的人,他在某处写过一些文章或博客文章关于如何解决 Spark 世界中的代码组织问题。
@DanielDarabos 建议我可以举一个代码组织和性能发生冲突的情况的例子。虽然我发现我在日常工作中经常遇到这个问题,但我发现将其归结为一个很好的最小示例有点困难;)但我会尝试。
在面向对象的世界里,我是单一职责原则的忠实粉丝,所以我会确保我的方法只负责一件事。它使它们可重复使用且易于测试。因此,如果我必须,例如,计算列表中某些数字的总和(匹配某些标准)并且我必须计算相同数字的平均值,我肯定会创建两种方法 - 一种计算总和,另一种计算总和计算平均值。像这样:
def main(implicit args: Array[String]): Unit = {
val list = List(("DK", 1.2), ("DK", 1.4), ("SE", 1.5))
println("Summed weights for DK = " + summedWeights(list, "DK")
println("Averaged weights for DK = " + averagedWeights(list, "DK")
}
def summedWeights(list: List, country: String): Double = {
list.filter(_._1 == country).map(_._2).sum
}
def averagedWeights(list: List, country: String): Double = {
val filteredByCountry = list.filter(_._1 == country)
filteredByCountry.map(_._2).sum/ filteredByCountry.length
}
def main(implicit args: Array[String]): Unit = {
val df = List(("DK", 1.2), ("DK", 1.4), ("SE", 1.5)).toDF("country", "weight")
println("Summed weights for DK = " + summedWeights(df, "DK")
println("Averaged weights for DK = " + averagedWeights(df, "DK")
}
def avgWeights(df: DataFrame, country: String, sqlContext: SQLContext): Double = {
import org.apache.spark.sql.functions._
import sqlContext.implicits._
val countrySpecific = df.filter('country === country)
val summedWeight = countrySpecific.agg(avg('weight))
summedWeight.first().getDouble(0)
}
def summedWeights(df: DataFrame, country: String, sqlContext: SQLContext): Double = {
import org.apache.spark.sql.functions._
import sqlContext.implicits._
val countrySpecific = df.filter('country === country)
val summedWeight = countrySpecific.agg(sum('weight))
summedWeight.first().getDouble(0)
}
df
可能包含数十亿行我宁愿不必执行
filter
两次。事实上,性能与 EMR 成本直接相关,所以我真的不想要那样。为了克服它,我因此决定违反 SRP 并简单地将两个函数合二为一,并确保我在国家过滤的
DataFrame
上调用 persist , 像这样:
def summedAndAveragedWeights(df: DataFrame, country: String, sqlContext: SQLContext): (Double, Double) = {
import org.apache.spark.sql.functions._
import sqlContext.implicits._
val countrySpecific = df.filter('country === country).persist(StorageLevel.MEMORY_AND_DISK_SER)
val summedWeights = countrySpecific.agg(sum('weight)).first().getDouble(0)
val averagedWeights = summedWeights / countrySpecific.count()
(summedWeights, averagedWeights)
}
df
在将它交给 sum 和 avg 函数之前(这也将是更多的 SRP),但在现实生活中可能会有一些中间计算需要一次又一次地进行。换句话说,
filter
这里的函数只是试图为一些可以从持久化中受益的东西做一个简单的例子。事实上,我认为调用
persist
是这里的关键字。调用
persist
将大大加快我的工作速度,但代价是我必须将所有依赖于持久化
DataFrame
的代码紧密结合起来。 - 即使它们在逻辑上是分开的。
最佳答案
我想你可以订阅Apache Spark
, databricks
youtube上的 channel ,多听多了解,特别是从别人那里吸取经验教训。
slide
SparkUI Visualization slide
Spark in Production: Lessons from 100+ Production Users slide
Building, Debugging, and Tuning Spark Machine Learning Pipelines slide
Top 5 mistakes when writing Spark applications slide
Tuning and Debugging Apache Spark slide
A Deeper Understanding of Spark Internals - Aaron Davidson (Databricks) 关于apache-spark - Spark 代码组织和最佳实践,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32777014/
目前正在学习 Spark 的类(class)并了解到执行者的定义: Each executor will hold a chunk of the data to be processed. Thisc
阅读了有关 http://spark.apache.org/docs/0.8.0/cluster-overview.html 的一些文档后,我有一些问题想要澄清。 以 Spark 为例: JavaSp
Spark核心中的调度器与以下Spark Stack(来自Learning Spark:Lightning-Fast Big Data Analysis一书)中的Standalone Schedule
我想在 spark-submit 或 start 处设置 spark.eventLog.enabled 和 spark.eventLog.dir -all level -- 不要求在 scala/ja
我有来自 SQL Server 的数据,需要在 Apache Spark (Databricks) 中进行操作。 在 SQL Server 中,此表的三个键列使用区分大小写的 COLLATION 选项
所有这些有什么区别和用途? spark.local.ip spark.driver.host spark.driver.bind地址 spark.driver.hostname 如何将机器修复为 Sp
我有大约 10 个 Spark 作业,每个作业都会进行一些转换并将数据加载到数据库中。必须为每个作业单独打开和关闭 Spark session ,每次初始化都会耗费时间。 是否可以只创建一次 Spar
/Downloads/spark-3.0.1-bin-hadoop2.7/bin$ ./spark-shell 20/09/23 10:58:45 WARN Utils: Your hostname,
我是 Spark 的完全新手,并且刚刚开始对此进行更多探索。我选择了更长的路径,不使用任何 CDH 发行版安装 hadoop,并且我从 Apache 网站安装了 Hadoop 并自己设置配置文件以了解
TL; 博士 Spark UI 显示的内核和内存数量与我在使用 spark-submit 时要求的数量不同 更多细节: 我在独立模式下运行 Spark 1.6。 当我运行 spark-submit 时
spark-submit 上的文档说明如下: The spark-submit script in Spark’s bin directory is used to launch applicatio
关闭。这个问题是opinion-based .它目前不接受答案。 想改善这个问题吗?更新问题,以便可以通过 editing this post 用事实和引文回答问题. 6 个月前关闭。 Improve
我想了解接收器如何在 Spark Streaming 中工作。根据我的理解,将有一个接收器任务在执行器中运行,用于收集数据并保存为 RDD。当调用 start() 时,接收器开始读取。需要澄清以下内容
有没有办法在不同线程中使用相同的 spark 上下文并行运行多个 spark 作业? 我尝试使用 Vertx 3,但看起来每个作业都在排队并按顺序启动。 如何让它在相同的 spark 上下文中同时运行
我们有一个 Spark 流应用程序,这是一项长期运行的任务。事件日志指向 hdfs 位置 hdfs://spark-history,当我们开始流式传输应用程序时正在其中创建 application_X
我们正在尝试找到一种加载 Spark (2.x) ML 训练模型的方法,以便根据请求(通过 REST 接口(interface))我们可以查询它并获得预测,例如http://predictor.com
Spark newb 问题:我在 spark-sql 中进行完全相同的 Spark SQL 查询并在 spark-shell . spark-shell版本大约需要 10 秒,而 spark-sql版
我正在使用 Spark 流。根据 Spark 编程指南(参见 http://spark.apache.org/docs/latest/programming-guide.html#accumulato
我正在使用 CDH 5.2。我可以使用 spark-shell 运行命令。 如何运行包含spark命令的文件(file.spark)。 有没有办法在不使用 sbt 的情况下在 CDH 5.2 中运行/
我使用 Elasticsearch 已经有一段时间了,但使用 Cassandra 的经验很少。 现在,我有一个项目想要使用 Spark 来处理数据,但我需要决定是否应该使用 Cassandra 还是
我是一名优秀的程序员,十分优秀!