- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
相比 Scala,我更喜欢 Python。但是,由于 Spark 本身是用 Scala 编写的,我希望我的代码在 Scala 中运行得比 Python 版本更快,原因很明显。
有了这个假设,我想为大约 1 GB 的数据学习和编写一些非常常见的预处理代码的 Scala 版本。数据取自 SpringLeaf 竞赛 Kaggle .只是为了概述数据(它包含 1936 个维度和 145232 行)。数据由各种类型组成,例如整数,浮点数,字符串, bool 值。我使用 8 个内核中的 6 个进行 Spark 处理;这就是为什么我使用 minPartitions=6
这样每个核心都有一些东西要处理。
Scala 代码
val input = sc.textFile("train.csv", minPartitions=6)
val input2 = input.mapPartitionsWithIndex { (idx, iter) =>
if (idx == 0) iter.drop(1) else iter }
val delim1 = "\001"
def separateCols(line: String): Array[String] = {
val line2 = line.replaceAll("true", "1")
val line3 = line2.replaceAll("false", "0")
val vals: Array[String] = line3.split(",")
for((x,i) <- vals.view.zipWithIndex) {
vals(i) = "VAR_%04d".format(i) + delim1 + x
}
vals
}
val input3 = input2.flatMap(separateCols)
def toKeyVal(line: String): (String, String) = {
val vals = line.split(delim1)
(vals(0), vals(1))
}
val input4 = input3.map(toKeyVal)
def valsConcat(val1: String, val2: String): String = {
val1 + "," + val2
}
val input5 = input4.reduceByKey(valsConcat)
input5.saveAsTextFile("output")
input = sc.textFile('train.csv', minPartitions=6)
DELIM_1 = '\001'
def drop_first_line(index, itr):
if index == 0:
return iter(list(itr)[1:])
else:
return itr
input2 = input.mapPartitionsWithIndex(drop_first_line)
def separate_cols(line):
line = line.replace('true', '1').replace('false', '0')
vals = line.split(',')
vals2 = ['VAR_%04d%s%s' %(e, DELIM_1, val.strip('\"'))
for e, val in enumerate(vals)]
return vals2
input3 = input2.flatMap(separate_cols)
def to_key_val(kv):
key, val = kv.split(DELIM_1)
return (key, val)
input4 = input3.map(to_key_val)
def vals_concat(v1, v2):
return v1 + ',' + v2
input5 = input4.reduceByKey(vals_concat)
input5.saveAsTextFile('output')
map
) 和 Python (
reduceByKey
) 的不同阶段 0 函数)
最佳答案
可以在下面找到讨论代码的原始答案。
首先,您必须区分不同类型的 API,每种 API 都有自己的性能考虑。
RDD API
(具有基于 JVM 的编排的纯 Python 结构)
这是受 Python 代码性能和 PySpark 实现细节影响最大的组件。虽然 Python 性能不太可能成为问题,但您至少需要考虑几个因素:
spark.python.worker.reuse
选项可用于在为每个任务 fork Python 进程和重用现有进程之间进行选择。后一个选项似乎有助于避免昂贵的垃圾收集(它更像是一种印象而不是系统测试的结果),而前一个(默认)对于昂贵的广播和导入来说是最佳的。 DataFrames
之间传递不必要的数据。和
RDDs
.这需要昂贵的序列化和反序列化,更不用说与 Python 解释器之间的数据传输了。
from pyspark.sql.functions import col
col("foo")
Datasets
在 Python 中,即使有当前的 Scala 实现也太简单了,不能提供与
DataFrame
相同的性能优势。 .
mllib.linalg
, 提供了比 Scala 更全面的方法集。
Dataset
API,带有卡住的RDD API 给Python 用户带来了机遇和挑战。虽然 API 的高级部分在 Python 中更容易公开,但更高级的功能几乎不可能使用
直接 .
collection
但 UDF serde 是
long term goal )。
DataFrames
将数据公开给 native JVM 代码并读回结果。我已经解释了一些选项
somewhere else您可以在
How to use a Scala class inside Pyspark 中找到 Python-Scala 往返的工作示例.
(key, value)
使用
zipWithIndex
创建的对或
enumerate
创建字符串只是为了在之后立即拆分它有什么意义?
flatMap
不能递归地工作,因此您可以简单地生成元组并跳过以下
map
无论如何。
reduceByKey
.一般来说,
reduceByKey
如果应用聚合函数可以减少必须混洗的数据量,则很有用。由于您只是连接字符串,因此这里没有任何好处。忽略低级的东西,比如引用的数量,你必须传输的数据量与
groupByKey
完全相同。 .
_.reduceByKey((v1: String, v2: String) => v1 + ',' + v2)
相当于
input4.reduceByKey(valsConcat)
在您的代码中不是一个好主意。
groupByKey
您可以尝试使用
aggregateByKey
与
StringBuilder
.与此类似的事情应该可以解决问题:
rdd.aggregateByKey(new StringBuilder)(
(acc, e) => {
if(!acc.isEmpty) acc.append(",").append(e)
else acc.append(e)
},
(acc1, acc2) => {
if(acc1.isEmpty | acc2.isEmpty) acc1.addString(acc2)
else acc1.append(",").addString(acc2)
}
)
val input = sc.textFile("train.csv", 6).mapPartitionsWithIndex{
(idx, iter) => if (idx == 0) iter.drop(1) else iter
}
val pairs = input.flatMap(line => line.split(",").zipWithIndex.map{
case ("true", i) => (i, "1")
case ("false", i) => (i, "0")
case p => p.swap
})
val result = pairs.groupByKey.map{
case (k, vals) => {
val valsString = vals.mkString(",")
s"$k,$valsString"
}
}
result.saveAsTextFile("scalaout")
def drop_first_line(index, itr):
if index == 0:
return iter(list(itr)[1:])
else:
return itr
def separate_cols(line):
line = line.replace('true', '1').replace('false', '0')
vals = line.split(',')
for (i, x) in enumerate(vals):
yield (i, x)
input = (sc
.textFile('train.csv', minPartitions=6)
.mapPartitionsWithIndex(drop_first_line))
pairs = input.flatMap(separate_cols)
result = (pairs
.groupByKey()
.map(lambda kv: "{0},{1}".format(kv[0], ",".join(kv[1]))))
result.saveAsTextFile("pythonout")
local[6]
模式(Intel(R) Xeon(R) CPU E3-1245 V2 @ 3.40GHz),每个执行器需要 4GB 内存(n = 3):
def go():
with open("train.csv") as fr:
lines = [
line.replace('true', '1').replace('false', '0').split(",")
for line in fr]
return zip(*lines[1:])
关于scala - Scala 与 Python 的 Spark 性能,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32464122/
我正在处理一组标记为 160 个组的 173k 点。我想通过合并最接近的(到 9 或 10 个组)来减少组/集群的数量。我搜索过 sklearn 或类似的库,但没有成功。 我猜它只是通过 knn 聚类
我有一个扁平数字列表,这些数字逻辑上以 3 为一组,其中每个三元组是 (number, __ignored, flag[0 or 1]),例如: [7,56,1, 8,0,0, 2,0,0, 6,1,
我正在使用 pipenv 来管理我的包。我想编写一个 python 脚本来调用另一个使用不同虚拟环境(VE)的 python 脚本。 如何运行使用 VE1 的 python 脚本 1 并调用另一个 p
假设我有一个文件 script.py 位于 path = "foo/bar/script.py"。我正在寻找一种在 Python 中通过函数 execute_script() 从我的主要 Python
这听起来像是谜语或笑话,但实际上我还没有找到这个问题的答案。 问题到底是什么? 我想运行 2 个脚本。在第一个脚本中,我调用另一个脚本,但我希望它们继续并行,而不是在两个单独的线程中。主要是我不希望第
我有一个带有 python 2.5.5 的软件。我想发送一个命令,该命令将在 python 2.7.5 中启动一个脚本,然后继续执行该脚本。 我试过用 #!python2.7.5 和http://re
我在 python 命令行(使用 python 2.7)中,并尝试运行 Python 脚本。我的操作系统是 Windows 7。我已将我的目录设置为包含我所有脚本的文件夹,使用: os.chdir("
剧透:部分解决(见最后)。 以下是使用 Python 嵌入的代码示例: #include int main(int argc, char** argv) { Py_SetPythonHome
假设我有以下列表,对应于及时的股票价格: prices = [1, 3, 7, 10, 9, 8, 5, 3, 6, 8, 12, 9, 6, 10, 13, 8, 4, 11] 我想确定以下总体上最
所以我试图在选择某个单选按钮时更改此框架的背景。 我的框架位于一个类中,并且单选按钮的功能位于该类之外。 (这样我就可以在所有其他框架上调用它们。) 问题是每当我选择单选按钮时都会出现以下错误: co
我正在尝试将字符串与 python 中的正则表达式进行比较,如下所示, #!/usr/bin/env python3 import re str1 = "Expecting property name
考虑以下原型(prototype) Boost.Python 模块,该模块从单独的 C++ 头文件中引入类“D”。 /* file: a/b.cpp */ BOOST_PYTHON_MODULE(c)
如何编写一个程序来“识别函数调用的行号?” python 检查模块提供了定位行号的选项,但是, def di(): return inspect.currentframe().f_back.f_l
我已经使用 macports 安装了 Python 2.7,并且由于我的 $PATH 变量,这就是我输入 $ python 时得到的变量。然而,virtualenv 默认使用 Python 2.6,除
我只想问如何加快 python 上的 re.search 速度。 我有一个很长的字符串行,长度为 176861(即带有一些符号的字母数字字符),我使用此函数测试了该行以进行研究: def getExe
list1= [u'%app%%General%%Council%', u'%people%', u'%people%%Regional%%Council%%Mandate%', u'%ppp%%Ge
这个问题在这里已经有了答案: Is it Pythonic to use list comprehensions for just side effects? (7 个答案) 关闭 4 个月前。 告
我想用 Python 将两个列表组合成一个列表,方法如下: a = [1,1,1,2,2,2,3,3,3,3] b= ["Sun", "is", "bright", "June","and" ,"Ju
我正在运行带有最新 Boost 发行版 (1.55.0) 的 Mac OS X 10.8.4 (Darwin 12.4.0)。我正在按照说明 here构建包含在我的发行版中的教程 Boost-Pyth
学习 Python,我正在尝试制作一个没有任何第 3 方库的网络抓取工具,这样过程对我来说并没有简化,而且我知道我在做什么。我浏览了一些在线资源,但所有这些都让我对某些事情感到困惑。 html 看起来
我是一名优秀的程序员,十分优秀!