- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我面临着繁重的数据转换。简而言之,我有数据列,每个数据列都包含与一些序数相对应的字符串。例如,高
、中
和低
。我的目标是将这些字符串映射到整数以保留顺序。在本例中,为LOW -> 0
、MID -> 1
和HIGH -> 2
。
这是一个生成此类数据的简单函数:
def fresh_df(N=100000, seed=None):
np.random.seed(seed)
feat1 = np.random.choice(["HI", "LO", "MID"], size=N)
feat2 = np.random.choice(["SMALL", "MEDIUM", "LARGE"], size=N)
pdf = pd.DataFrame({
"feat1": feat1,
"feat2": feat2
})
return spark.createDataFrame(pdf)
我的第一个方法是:
feat1_dict = {"HI": 1, "MID": 2, "LO": 3}
feat2_dict = {"SMALL": 0, "MEDIUM": 1, "LARGE": 2}
mappings = {
"feat1": F.create_map([F.lit(x) for x in chain(*feat1_dict.items())]),
"feat2": F.create_map([F.lit(x) for x in chain(*feat2_dict.items())])
}
for col in df.columns:
col_map = mappings[col]
df = df.withColumn(col+"_mapped", col_map[df[col]])
这按预期工作,但实际上它变得很慢,我想优化该过程。我读到pandas_udf
这给了我希望。下面是修改后的代码:
feats_dict = {
"feat1": feat1_dict,
"feat2": feat2_dict
}
for col_name in df.columns:
@F.pandas_udf('integer', F.PandasUDFType.SCALAR)
def map_map(col):
return col.map(feats_dict[col_name])
df = df.withColumn(col_name + "_mapped", map_map(df[col_name]))
唉!比较这两个版本时,执行时间没有任何改进。我在 Spark 本地实例(使用 docker)和 5 节点 EMR 集群(使用默认配置)上对两者进行了比较。
我创建了一个notebook您可以在其中看到所有代码。一般来说,我使用了以下导入:
import numpy as np
import pandas as pd
from itertools import chain
from pyspark.sql import functions as F
我错过了什么?为什么这个过程如此缓慢,为什么使用 pandas_udf
时没有任何改进?
最佳答案
为什么这么慢?因为 Spark 在 JVM 中运行,而 pyspark 则不在 JVM 中运行(因为它是一个 Python 进程),为了使该进程成为可能,需要将所有数据序列化和反序列化移动到 JVM。
您可以使用 when
和 otherwise
函数映射值,避免序列化和反序列化过程,从而提高性能。
import numpy as np
import pandas as pd
import pyspark.sql.functions as f
from pyspark.shell import spark
def fresh_df(n=100000, seed=None):
np.random.seed(seed)
feat1 = np.random.choice(["HI", "LO", "MID"], size=n)
feat2 = np.random.choice(["SMALL", "MEDIUM", "LARGE"], size=n)
pdf = pd.DataFrame({
"feat1": feat1,
"feat2": feat2
})
return spark.createDataFrame(pdf)
df = fresh_df()
df = df.withColumn('feat1_mapped', f
.when(df.feat1 == f.lit('HI'), 1)
.otherwise(f.when(df.feat1 == f.lit('MID'), 2).otherwise(3)))
df = df.withColumn('feat2_mapped', f
.when(df.feat2 == f.lit('SMALL'), 0)
.otherwise(f.when(df.feat2 == f.lit('MEDIUM'), 1).otherwise(2)))
df.show(n=20)
输出
+-----+------+------------+------------+
|feat1| feat2|feat1_mapped|feat2_mapped|
+-----+------+------------+------------+
| LO| SMALL| 3| 0|
| LO|MEDIUM| 3| 1|
| MID|MEDIUM| 2| 1|
| MID| SMALL| 2| 0|
| MID| LARGE| 2| 2|
| MID| SMALL| 2| 0|
| LO| SMALL| 3| 0|
| MID| LARGE| 2| 2|
| MID| LARGE| 2| 2|
| MID| SMALL| 2| 0|
| MID|MEDIUM| 2| 1|
| LO| LARGE| 3| 2|
| HI|MEDIUM| 1| 1|
| LO| SMALL| 3| 0|
| HI|MEDIUM| 1| 1|
| MID| SMALL| 2| 0|
| MID|MEDIUM| 2| 1|
| HI| SMALL| 1| 0|
| HI| LARGE| 1| 2|
| MID| LARGE| 2| 2|
+-----+------+------------+------------+
关于python - Spark pandas_udf 并不更快,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56930974/
我在使用以下代码时遇到 pandas_udf 错误。代码是基于另一列创建具有数据类型的列。相同的代码适用于正常较慢的 udf(已注释掉)。 基本上任何更复杂的“字符串”+数据都会返回错误。 # fro
我面临着繁重的数据转换。简而言之,我有数据列,每个数据列都包含与一些序数相对应的字符串。例如,高、中和低。我的目标是将这些字符串映射到整数以保留顺序。在本例中,为LOW -> 0、MID -> 1 和
我正在尝试制作一个 pandas UDF,它接受两列整数值,并根据这些值之间的差异返回一个小数数组,其长度等于上述差异。 到目前为止,这是我的尝试,我一直在尝试各种不同的方法来让它发挥作用,但这是总体
我正在附加到 AWS EMR 实例的 jupyter notebook 上尝试一些与 pyspark 相关的实验。我有一个 spark 数据框,它从 s3 读取数据,然后过滤掉一些东西。使用 df1.
我开始在本地玩 Spark 并发现这个奇怪的问题 1) pip install pyspark==2.3.1 2)pyspark> 将 Pandas 导入为 pd 从 pyspark.sql.func
This answer很好地解释了如何使用 pyspark 的 groupby 和 pandas_udf 进行自定义聚合。但是,我不可能像示例的这一部分所示那样手动声明我的架构 from pyspar
我正在使用 pandas_udf 在我的 Spark 集群上应用机器学习模型,并且有兴趣预定义通过箭头发送到 UDF 的最小记录数。 我遵循了大部分 UDF 的 databricks 教程... ht
我创建了一个 Pandas UDF,它将输入一个数据帧,在 Primary_Key 和 Predictions 上预测并输出一个数据帧。 schema = StructType([StructFiel
我写了一个UDF。它非常慢。我想用 pandas_udf 替换它以利用矢量化。 实际的 udf 有点复杂,但我创建了一个简化的玩具版本。 我的问题:是否可以将玩具示例中的 UDF 替换为可以利用矢量化
我已经测试过 logger和 print无法在 pandas_udf 中打印消息,无论是集群模式还是客户端模式。 测试代码: import sys import numpy as np import
我有这个 df: df = spark.createDataFrame( [('row_a', 5.0, 0.0, 11.0), ('row_b', 3394.0, 0.0, 454
我在 Jupyter 笔记本中运行以下代码,但出现 ImportError。请注意,“udf”可以导入到 Jupyter 中。 从 pyspark.sql.functions 导入 pandas_ud
我有这个 df: df = spark.createDataFrame( [('row_a', 5.0, 0.0, 11.0), ('row_b', 3394.0, 0.0, 454
可以使用外部库,例如 textdistance在pandas_udf里面?我已经尝试过,但收到此错误: ValueError: The truth value of a Series is ambig
我目前正在使用 PySpark 开发我的第一个完整系统,我遇到了一些奇怪的、与内存相关的问题。在其中一个阶段,我想类似于 Split-Apply-Combine 策略以修改 DataFrame。也就是
我正在使用 PySpark 的新 pandas_udf 装饰器,我试图让它将多个列作为输入并返回一个系列作为输入,但是,我收到一个 TypeError : 无效参数 示例代码 @pandas_udf(
我正在使用 PySpark 的新 pandas_udf 装饰器,我试图让它将多个列作为输入并返回一个系列作为输入,但是,我收到一个 TypeError : 无效参数 示例代码 @pandas_udf(
我无法从可用的 Pyspark 文档中复制 Spark 代码 here. 例如,当我尝试以下与 Grouped Map 有关的代码时: import numpy as np import pandas
我正在尝试将函数应用于 pyspark 中的每个数据集组。我遇到的第一个错误是 Py4JError: An error occurred while calling o62.__getnewargs_
我正在构建多个 Prophet 模型,其中每个模型都传递给 pandas_udf 函数,该函数训练模型并使用 MLflow 存储结果。 @pandas_udf(result_schema, Panda
我是一名优秀的程序员,十分优秀!