- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
相比 Scala,我更喜欢 Python。但是,由于 Spark 本身是用 Scala 编写的,我希望我的代码在 Scala 中运行得比 Python 版本更快,原因很明显。
有了这个假设,我想为大约 1 GB 的数据学习和编写一些非常常见的预处理代码的 Scala 版本。数据取自 SpringLeaf 竞赛 Kaggle .只是为了概述数据(它包含 1936 个维度和 145232 行)。数据由各种类型组成,例如整数,浮点数,字符串, bool 值。我使用 8 个内核中的 6 个进行 Spark 处理;这就是为什么我使用 minPartitions=6
这样每个核心都有一些东西要处理。
Scala 代码
val input = sc.textFile("train.csv", minPartitions=6)
val input2 = input.mapPartitionsWithIndex { (idx, iter) =>
if (idx == 0) iter.drop(1) else iter }
val delim1 = "\001"
def separateCols(line: String): Array[String] = {
val line2 = line.replaceAll("true", "1")
val line3 = line2.replaceAll("false", "0")
val vals: Array[String] = line3.split(",")
for((x,i) <- vals.view.zipWithIndex) {
vals(i) = "VAR_%04d".format(i) + delim1 + x
}
vals
}
val input3 = input2.flatMap(separateCols)
def toKeyVal(line: String): (String, String) = {
val vals = line.split(delim1)
(vals(0), vals(1))
}
val input4 = input3.map(toKeyVal)
def valsConcat(val1: String, val2: String): String = {
val1 + "," + val2
}
val input5 = input4.reduceByKey(valsConcat)
input5.saveAsTextFile("output")
input = sc.textFile('train.csv', minPartitions=6)
DELIM_1 = '\001'
def drop_first_line(index, itr):
if index == 0:
return iter(list(itr)[1:])
else:
return itr
input2 = input.mapPartitionsWithIndex(drop_first_line)
def separate_cols(line):
line = line.replace('true', '1').replace('false', '0')
vals = line.split(',')
vals2 = ['VAR_%04d%s%s' %(e, DELIM_1, val.strip('\"'))
for e, val in enumerate(vals)]
return vals2
input3 = input2.flatMap(separate_cols)
def to_key_val(kv):
key, val = kv.split(DELIM_1)
return (key, val)
input4 = input3.map(to_key_val)
def vals_concat(v1, v2):
return v1 + ',' + v2
input5 = input4.reduceByKey(vals_concat)
input5.saveAsTextFile('output')
map
) 和 Python (
reduceByKey
) 的不同阶段 0 函数)
最佳答案
可以在下面找到讨论代码的原始答案。
首先,您必须区分不同类型的 API,每种 API 都有自己的性能考虑。
RDD API
(具有基于 JVM 的编排的纯 Python 结构)
这是受 Python 代码性能和 PySpark 实现细节影响最大的组件。虽然 Python 性能不太可能成为问题,但您至少需要考虑几个因素:
spark.python.worker.reuse
选项可用于在为每个任务 fork Python 进程和重用现有进程之间进行选择。后一个选项似乎有助于避免昂贵的垃圾收集(它更像是一种印象而不是系统测试的结果),而前一个(默认)对于昂贵的广播和导入来说是最佳的。 DataFrames
之间传递不必要的数据。和
RDDs
.这需要昂贵的序列化和反序列化,更不用说与 Python 解释器之间的数据传输了。
from pyspark.sql.functions import col
col("foo")
Datasets
在 Python 中,即使有当前的 Scala 实现也太简单了,不能提供与
DataFrame
相同的性能优势。 .
mllib.linalg
, 提供了比 Scala 更全面的方法集。
Dataset
API,带有卡住的RDD API 给Python 用户带来了机遇和挑战。虽然 API 的高级部分在 Python 中更容易公开,但更高级的功能几乎不可能使用
直接 .
collection
但 UDF serde 是
long term goal )。
DataFrames
将数据公开给 native JVM 代码并读回结果。我已经解释了一些选项
somewhere else您可以在
How to use a Scala class inside Pyspark 中找到 Python-Scala 往返的工作示例.
(key, value)
使用
zipWithIndex
创建的对或
enumerate
创建字符串只是为了在之后立即拆分它有什么意义?
flatMap
不能递归地工作,因此您可以简单地生成元组并跳过以下
map
无论如何。
reduceByKey
.一般来说,
reduceByKey
如果应用聚合函数可以减少必须混洗的数据量,则很有用。由于您只是连接字符串,因此这里没有任何好处。忽略低级的东西,比如引用的数量,你必须传输的数据量与
groupByKey
完全相同。 .
_.reduceByKey((v1: String, v2: String) => v1 + ',' + v2)
相当于
input4.reduceByKey(valsConcat)
在您的代码中不是一个好主意。
groupByKey
您可以尝试使用
aggregateByKey
与
StringBuilder
.与此类似的事情应该可以解决问题:
rdd.aggregateByKey(new StringBuilder)(
(acc, e) => {
if(!acc.isEmpty) acc.append(",").append(e)
else acc.append(e)
},
(acc1, acc2) => {
if(acc1.isEmpty | acc2.isEmpty) acc1.addString(acc2)
else acc1.append(",").addString(acc2)
}
)
val input = sc.textFile("train.csv", 6).mapPartitionsWithIndex{
(idx, iter) => if (idx == 0) iter.drop(1) else iter
}
val pairs = input.flatMap(line => line.split(",").zipWithIndex.map{
case ("true", i) => (i, "1")
case ("false", i) => (i, "0")
case p => p.swap
})
val result = pairs.groupByKey.map{
case (k, vals) => {
val valsString = vals.mkString(",")
s"$k,$valsString"
}
}
result.saveAsTextFile("scalaout")
def drop_first_line(index, itr):
if index == 0:
return iter(list(itr)[1:])
else:
return itr
def separate_cols(line):
line = line.replace('true', '1').replace('false', '0')
vals = line.split(',')
for (i, x) in enumerate(vals):
yield (i, x)
input = (sc
.textFile('train.csv', minPartitions=6)
.mapPartitionsWithIndex(drop_first_line))
pairs = input.flatMap(separate_cols)
result = (pairs
.groupByKey()
.map(lambda kv: "{0},{1}".format(kv[0], ",".join(kv[1]))))
result.saveAsTextFile("pythonout")
local[6]
模式(Intel(R) Xeon(R) CPU E3-1245 V2 @ 3.40GHz),每个执行器需要 4GB 内存(n = 3):
def go():
with open("train.csv") as fr:
lines = [
line.replace('true', '1').replace('false', '0').split(",")
for line in fr]
return zip(*lines[1:])
关于scala - Scala 与 Python 的 Spark 性能,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32464122/
我有一些 Scala 代码,它用两个不同版本的类型参数化函数做了一些漂亮的事情。我已经从我的应用程序中简化了很多,但最后我的代码充满了形式 w(f[Int],f[Double]) 的调用。哪里w()是
如果我在同一目录中有两个单独的未编译的 scala 文件: // hello.scala object hello { def world() = println("hello world") }
val schema = df.schema val x = df.flatMap(r => (0 until schema.length).map { idx => ((idx, r.g
环境: Play 2.3.0/Scala 2.11.1/IntelliJ 13.1 我使用 Typesafe Activator 1.2.1 用 Scala 2.11.1 创建一个新项目。项目创建好后
我只是想知道如何使用我自己的类扩展 Scala 控制台和“脚本”运行程序,以便我可以通过使用实际的 Scala 语言与其通信来实际使用我的代码?我应将 jar 放在哪里,以便无需临时配置即可从每个 S
我已经根据 README.md 文件安装了 ensime,但是,我在低级 ensime-server 缓冲区中出现以下错误: 信息: fatal error :scala.tools.nsc.Miss
我正在阅读《Scala 编程》一书。在书中,它说“一个函数文字被编译成一个类,当在运行时实例化时它是一个函数值”。并且它提到“函数值是对象,因此您可以根据需要将它们存储在变量中”。 所以我尝试检查函数
我有 hello world scala native 应用程序,想对此应用程序运行小型 scala 测试我使用通常的测试命令,但它抛出异常: NativeMain.scala object Nati
有few resources在网络上,在编写与代码模式匹配的 Scala 编译器插件方面很有指导意义,但这些对生成代码(构建符号树)没有帮助。我应该从哪里开始弄清楚如何做到这一点? (如果有比手动构建
我是 Scala 的新手。但是,我用 创建了一个中等大小的程序。斯卡拉 2.9.0 .现在我想使用一个仅适用于 的开源库斯卡拉 2.7.7 . 是吗可能 在我的 Scala 2.9.0 程序中使用这个
有没有办法在 Scala 2.11 中使用 scala-pickling? 我在 sonatype 存储库中尝试了唯一的 scala-pickling_2.11 工件,但它似乎不起作用。我收到消息:
这与命令行编译器选项无关。如何以编程方式获取代码内的 Scala 版本? 或者,Eclipse Scala 插件 v2 在哪里存储 scalac 的路径? 最佳答案 这无需访问 scala-compi
我正在阅读《Scala 编程》一书,并在第 6 章中的类 Rational 实现中遇到了一些问题。 这是我的 Rational 类的初始版本(基于本书) class Rational(numerato
我是 Scala 新手,我正在尝试开发一个使用自定义库的小项目。我在库内创建了一个mysql连接池。这是我的库的build.sbt organization := "com.learn" name :
我正在尝试运行一些 Scala 代码,只是暂时打印出“Hello”,但我希望在 SBT 项目中编译 Scala 代码之前运行 Scala 代码。我发现在 build.sbt 中有以下工作。 compi
Here链接到 maven Scala 插件使用。但没有提到它使用的究竟是什么 Scala 版本。我创建了具有以下配置的 Maven Scala 项目: org.scala-tools
我对 Scala 还很陌生,请多多包涵。我有一堆包裹在一个大数组中的 future 。 future 已经完成了查看几 TB 数据的辛勤工作,在我的应用程序结束时,我想总结上述 future 的所有结
我有一个 scala 宏,它依赖于通过包含其位置的静态字符串指定的任意 xml 文件。 def myMacro(path: String) = macro myMacroImpl def myMacr
这是我的功能: def sumOfSquaresOfOdd(in: Seq[Int]): Int = { in.filter(_%2==1).map(_*_).reduce(_+_) } 为什么我
这个问题在这里已经有了答案: Calculating the difference between two Java date instances (45 个答案) 关闭 5 年前。 所以我有一个这
我是一名优秀的程序员,十分优秀!