- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我目前正在使用 PySpark 开发我的第一个完整系统,我遇到了一些奇怪的、与内存相关的问题。在其中一个阶段,我想类似于 Split-Apply-Combine 策略以修改 DataFrame。也就是说,我想对给定列定义的每个组应用一个函数,最后将它们全部组合起来。问题是,我要应用的函数是一种适用于“说” Pandas 惯用语的拟合模型的预测方法,即它被矢量化并以 Pandas 系列作为输入。
然后我设计了一个迭代策略,遍历组并手动应用 pandas_udf.Scalar 来解决问题。组合部分是使用对 DataFrame.unionByName() 的增量调用完成的。我决定不使用 GroupedMap 类型的 pandas_udf 因为文档声明内存应该由用户管理,并且当其中一个组可能太大而无法将其保存在内存中或由一个表示时,您应该特别小心 Pandas 数据框。
主要问题是所有处理似乎都运行良好,但最后我想将最终的 DataFrame 序列化为 Parquet 文件。正是在这一点上,我收到了很多关于 DataFrameWriter 的类似 Java 的错误,或者内存不足的异常。
我已经在 Windows 和 Linux 机器上尝试过代码。我设法避免错误的唯一方法是增加机器中的 --driver-memory 值。每个平台的最小值都不同,并且取决于问题的大小,这让我怀疑内存泄漏。
问题直到我开始使用pandas_udf 才出现。我认为在使用 pandas_udf 时,在整个 pyarrow 序列化过程中的某处可能存在内存泄漏。
我创建了一个最小的可重现示例。如果我直接使用 Python 运行这个脚本,它会产生错误。使用 spark-submit 并增加很多驱动程序内存,可以使其工作。
import pyspark
import pyspark.sql.functions as F
import pyspark.sql.types as spktyp
# Dummy pandas_udf -------------------------------------------------------------
@F.pandas_udf(spktyp.DoubleType())
def predict(x):
return x + 100.0
# Initialization ---------------------------------------------------------------
spark = pyspark.sql.SparkSession.builder.appName(
"mre").master("local[3]").getOrCreate()
sc = spark.sparkContext
# Generate a dataframe ---------------------------------------------------------
out_path = "out.parquet"
z = 105
m = 750000
schema = spktyp.StructType(
[spktyp.StructField("ID", spktyp.DoubleType(), True)]
)
df = spark.createDataFrame(
[(float(i),) for i in range(m)],
schema
)
for j in range(z):
df = df.withColumn(
f"N{j}",
F.col("ID") + float(j)
)
df = df.withColumn(
"X",
F.array(
F.lit("A"),
F.lit("B"),
F.lit("C"),
F.lit("D"),
F.lit("E")
).getItem(
(F.rand()*3).cast("int")
)
)
# Set the column names for grouping, input and output --------------------------
group_col = "X"
in_col = "N0"
out_col = "EP"
# Extract different group ids in grouping variable -----------------------------
rows = df.select(group_col).distinct().collect()
groups = [row[group_col] for row in rows]
print(f"Groups: {groups}")
# Split and treat the first id -------------------------------------------------
first, *others = groups
cur_df = df.filter(F.col(group_col) == first)
result = cur_df.withColumn(
out_col,
predict(in_col)
)
# Traverse the remaining group ids ---------------------------------------------
for i, other in enumerate(others):
cur_df = df.filter(F.col(group_col) == other)
new_df = cur_df.withColumn(
out_col,
predict(in_col)
)
# Incremental union --------------------------------------------------------
result = result.unionByName(new_df)
# Save to disk -----------------------------------------------------------------
result.write.mode("overwrite").parquet(out_path)
令人震惊的是(至少对我而言),如果我在序列化语句之前调用 repartition(),问题似乎就消失了。
result = result.repartition(result.rdd.getNumPartitions())
result.write.mode("overwrite").parquet(out_path)
将这条线放到位后,我可以降低很多驱动程序内存配置,并且脚本运行良好。我几乎无法理解所有这些因素之间的关系,尽管我怀疑代码的惰性评估和 pyarrow 序列化可能是相关的。
这是我用于开发的当前环境:
arrow-cpp 0.13.0 py36hee3af98_1 conda-forge
asn1crypto 0.24.0 py36_1003 conda-forge
astroid 2.2.5 py36_0
atomicwrites 1.3.0 py_0 conda-forge
attrs 19.1.0 py_0 conda-forge
blas 1.0 mkl
boost-cpp 1.68.0 h6a4c333_1000 conda-forge
brotli 1.0.7 he025d50_1000 conda-forge
ca-certificates 2019.3.9 hecc5488_0 conda-forge
certifi 2019.3.9 py36_0 conda-forge
cffi 1.12.3 py36hb32ad35_0 conda-forge
chardet 3.0.4 py36_1003 conda-forge
colorama 0.4.1 py36_0
cryptography 2.6.1 py36hb32ad35_0 conda-forge
dill 0.2.9 py36_0
docopt 0.6.2 py36_0
entrypoints 0.3 py36_0
falcon 1.4.1.post1 py36hfa6e2cd_1000 conda-forge
fastavro 0.21.21 py36hfa6e2cd_0 conda-forge
flake8 3.7.7 py36_0
future 0.17.1 py36_1000 conda-forge
gflags 2.2.2 ha925a31_0
glog 0.3.5 h6538335_1
hug 2.5.2 py36hfa6e2cd_0 conda-forge
icc_rt 2019.0.0 h0cc432a_1
idna 2.8 py36_1000 conda-forge
intel-openmp 2019.3 203
isort 4.3.17 py36_0
lazy-object-proxy 1.3.1 py36hfa6e2cd_2
libboost 1.67.0 hd9e427e_4
libprotobuf 3.7.1 h1a1b453_0 conda-forge
lz4-c 1.8.1.2 h2fa13f4_0
mccabe 0.6.1 py36_1
mkl 2018.0.3 1
mkl_fft 1.0.6 py36hdbbee80_0
mkl_random 1.0.1 py36h77b88f5_1
more-itertools 4.3.0 py36_1000 conda-forge
ninabrlong 0.1.0 dev_0 <develop>
nose 1.3.7 py36_1002 conda-forge
nose-exclude 0.5.0 py_0 conda-forge
numpy 1.15.0 py36h9fa60d3_0
numpy-base 1.15.0 py36h4a99626_0
openssl 1.1.1b hfa6e2cd_2 conda-forge
pandas 0.23.3 py36h830ac7b_0
parquet-cpp 1.5.1 2 conda-forge
pip 19.0.3 py36_0
pluggy 0.11.0 py_0 conda-forge
progressbar2 3.38.0 py_1 conda-forge
py 1.8.0 py_0 conda-forge
py4j 0.10.7 py36_0
pyarrow 0.13.0 py36h8c67754_0 conda-forge
pycodestyle 2.5.0 py36_0
pycparser 2.19 py36_1 conda-forge
pyflakes 2.1.1 py36_0
pygam 0.8.0 py_0 conda-forge
pylint 2.3.1 py36_0
pyopenssl 19.0.0 py36_0 conda-forge
pyreadline 2.1 py36_1
pysocks 1.6.8 py36_1002 conda-forge
pyspark 2.4.1 py_0
pytest 4.5.0 py36_0 conda-forge
pytest-runner 4.4 py_0 conda-forge
python 3.6.6 hea74fb7_0
python-dateutil 2.8.0 py36_0
python-hdfs 2.3.1 py_0 conda-forge
python-mimeparse 1.6.0 py_1 conda-forge
python-utils 2.3.0 py_1 conda-forge
pytz 2019.1 py_0
re2 2019.04.01 vc14h6538335_0 [vc14] conda-forge
requests 2.21.0 py36_1000 conda-forge
requests-kerberos 0.12.0 py36_0
scikit-learn 0.20.1 py36hb854c30_0
scipy 1.1.0 py36hc28095f_0
setuptools 41.0.0 py36_0
six 1.12.0 py36_0
snappy 1.1.7 h777316e_3
sqlite 3.28.0 he774522_0
thrift-cpp 0.12.0 h59828bf_1002 conda-forge
typed-ast 1.3.1 py36he774522_0
urllib3 1.24.2 py36_0 conda-forge
vc 14.1 h0510ff6_4
vs2015_runtime 14.15.26706 h3a45250_0
wcwidth 0.1.7 py_1 conda-forge
wheel 0.33.1 py36_0
win_inet_pton 1.1.0 py36_0 conda-forge
wincertstore 0.2 py36h7fe50ca_0
winkerberos 0.7.0 py36_1
wrapt 1.11.1 py36he774522_0
xz 5.2.4 h2fa13f4_4
zlib 1.2.11 h62dcd97_3
zstd 1.3.3 hfe6a214_0
任何提示或帮助将不胜感激。
最佳答案
我想对你的帖子发表评论,但我的声誉太低了。
根据我的经验,udf 会大大降低你的性能,特别是如果你用 python(或 pandas?)编写它们。有一篇文章,为什么你不应该使用 python udfs 而是使用 scala udfs:https://medium.com/wbaa/using-scala-udfs-in-pyspark-b70033dd69b9
在我的案例中,可以使用内置函数,即使它非常复杂,运行时间也比以前减少了大约 5%。
对于您的 OOM 错误以及为什么重新分区对您有效,我没有任何解释。我能给你的唯一建议是尽可能避免使用 UDF,尽管在你的情况下这似乎并不那么容易。
关于python - 使用 pandas_udf 和 Parquet 序列化时内存泄漏?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56329093/
我在网上搜索但没有找到任何合适的文章解释如何使用 javascript 使用 WCF 服务,尤其是 WebScriptEndpoint。 任何人都可以对此给出任何指导吗? 谢谢 最佳答案 这是一篇关于
我正在编写一个将运行 Linux 命令的 C 程序,例如: cat/etc/passwd | grep 列表 |剪切-c 1-5 我没有任何结果 *这里 parent 等待第一个 child (chi
所以我正在尝试处理文件上传,然后将该文件作为二进制文件存储到数据库中。在我存储它之后,我尝试在给定的 URL 上提供文件。我似乎找不到适合这里的方法。我需要使用数据库,因为我使用 Google 应用引
我正在尝试制作一个宏,将下面的公式添加到单元格中,然后将其拖到整个列中并在 H 列中复制相同的公式 我想在 F 和 H 列中输入公式的数据 Range("F1").formula = "=IF(ISE
问题类似于this one ,但我想使用 OperatorPrecedenceParser 解析带有函数应用程序的表达式在 FParsec . 这是我的 AST: type Expression =
我想通过使用 sequelize 和 node.js 将这个查询更改为代码取决于在哪里 select COUNT(gender) as genderCount from customers where
我正在使用GNU bash,版本5.0.3(1)-发行版(x86_64-pc-linux-gnu),我想知道为什么简单的赋值语句会出现语法错误: #/bin/bash var1=/tmp
这里,为什么我的代码在 IE 中不起作用。我的代码适用于所有浏览器。没有问题。但是当我在 IE 上运行我的项目时,它发现错误。 而且我的 jquery 类和 insertadjacentHTMl 也不
我正在尝试更改标签的innerHTML。我无权访问该表单,因此无法编辑 HTML。标签具有的唯一标识符是“for”属性。 这是输入和标签的结构:
我有一个页面,我可以在其中返回用户帖子,可以使用一些 jquery 代码对这些帖子进行即时评论,在发布新评论后,我在帖子下插入新评论以及删除 按钮。问题是 Delete 按钮在新插入的元素上不起作用,
我有一个大约有 20 列的“管道分隔”文件。我只想使用 sha1sum 散列第一列,它是一个数字,如帐号,并按原样返回其余列。 使用 awk 或 sed 执行此操作的最佳方法是什么? Accounti
我需要将以下内容插入到我的表中...我的用户表有五列 id、用户名、密码、名称、条目。 (我还没有提交任何东西到条目中,我稍后会使用 php 来做)但由于某种原因我不断收到这个错误:#1054 - U
所以我试图有一个输入字段,我可以在其中输入任何字符,但然后将输入的值小写,删除任何非字母数字字符,留下“。”而不是空格。 例如,如果我输入: 地球的 70% 是水,-!*#$^^ & 30% 土地 输
我正在尝试做一些我认为非常简单的事情,但出于某种原因我没有得到想要的结果?我是 javascript 的新手,但对 java 有经验,所以我相信我没有使用某种正确的规则。 这是一个获取输入值、检查选择
我想使用 angularjs 从 mysql 数据库加载数据。 这就是应用程序的工作原理;用户登录,他们的用户名存储在 cookie 中。该用户名显示在主页上 我想获取这个值并通过 angularjs
我正在使用 autoLayout,我想在 UITableViewCell 上放置一个 UIlabel,它应该始终位于单元格的右侧和右侧的中心。 这就是我想要实现的目标 所以在这里你可以看到我正在谈论的
我需要与 MySql 等效的 elasticsearch 查询。我的 sql 查询: SELECT DISTINCT t.product_id AS id FROM tbl_sup_price t
我正在实现代码以使用 JSON。 func setup() { if let flickrURL = NSURL(string: "https://api.flickr.com/
我尝试使用for循环声明变量,然后测试cols和rols是否相同。如果是,它将运行递归函数。但是,我在 javascript 中执行 do 时遇到问题。有人可以帮忙吗? 现在,在比较 col.1 和
我举了一个我正在处理的问题的简短示例。 HTML代码: 1 2 3 CSS 代码: .BB a:hover{ color: #000; } .BB > li:after {
我是一名优秀的程序员,十分优秀!