gpt4 book ai didi

python - 在 PySpark 中编码和组装多个功能

转载 作者:IT老高 更新时间:2023-10-28 20:33:01 27 4
gpt4 key购买 nike

我有一个 Python 类,用于在 Spark 中加载和处理一些数据。在我需要做的各种事情中,我正在生成一个从 Spark 数据帧中的各个列派生的虚拟变量列表。我的问题是我不确定如何正确定义用户定义函数来完成我所需要的。

确实目前有一个方法,当映射到底层数据帧 RDD 时,解决了一半的问题(请记住,这是一个更大的 data_processor 类中的方法) :

def build_feature_arr(self,table):
# this dict has keys for all the columns for which I need dummy coding
categories = {'gender':['1','2'], ..}

# there are actually two differnt dataframes that I need to do this for, this just specifies which I'm looking at, and grabs the relevant features from a config file
if table == 'users':
iter_over = self.config.dyadic_features_to_include
elif table == 'activty':
iter_over = self.config.user_features_to_include

def _build_feature_arr(row):
result = []
row = row.asDict()
for col in iter_over:
column_value = str(row[col]).lower()
cats = categories[col]
result += [1 if column_value and cat==column_value else 0 for cat in cats]
return result
return _build_feature_arr

本质上,对于指定的数据框,它的作用是获取指定列的分类变量值,并返回这些新虚拟变量的值列表。这意味着以下代码:

data = data_processor(init_args)
result = data.user_data.rdd.map(self.build_feature_arr('users'))

返回类似:

In [39]: result.take(10)
Out[39]:
[[1, 0, 0, 0, 1, 0],
[1, 0, 0, 1, 0, 0],
[1, 0, 0, 0, 0, 0],
[1, 0, 1, 0, 0, 0],
[1, 0, 0, 1, 0, 0],
[1, 0, 0, 1, 0, 0],
[0, 1, 1, 0, 0, 0],
[1, 0, 1, 1, 0, 0],
[1, 0, 0, 1, 0, 0],
[1, 0, 0, 0, 0, 1]]

在生成我想要的虚拟变量列表方面,这正是我想要的,但这是我的问题:我怎样才能(a)制作一个具有类似功能的 UDF,我可以在 Spark SQL 查询中使用(或其他方式,我想),或者(b)从上述 map 中获取 RDD 并将其作为新列添加到 user_data 数据帧?

无论哪种方式,我需要做的是生成一个新的数据框,其中包含来自 user_data 的列,以及一个包含上述函数输出(或其他内容)的新列(我们称之为 feature_array)功能等效)。

最佳答案

Spark >= 2.3, >= 3.0

从 Spark 2.3 开始,OneHotEncoder 已被弃用,取而代之的是 OneHotEncoderEstimator。如果您使用最新版本,请修改 encoder 代码

from pyspark.ml.feature import OneHotEncoderEstimator

encoder = OneHotEncoderEstimator(
inputCols=["gender_numeric"],
outputCols=["gender_vector"]
)

在 Spark 3.0 中,此变体已重命名为 OneHotEncoder:

from pyspark.ml.feature import OneHotEncoder

encoder = OneHotEncoder(
inputCols=["gender_numeric"],
outputCols=["gender_vector"]
)

另外,StringIndexer 已扩展为支持多个输入列:

StringIndexer(inputCols=["gender"], outputCols=["gender_numeric"])

Spark <2.3

嗯,你可以写一个 UDF,但你为什么要写呢?已经有很多工具可以处理这类任务:

from pyspark.sql import Row
from pyspark.ml.linalg import DenseVector

row = Row("gender", "foo", "bar")

df = sc.parallelize([
row("0", 3.0, DenseVector([0, 2.1, 1.0])),
row("1", 1.0, DenseVector([0, 1.1, 1.0])),
row("1", -1.0, DenseVector([0, 3.4, 0.0])),
row("0", -3.0, DenseVector([0, 4.1, 0.0]))
]).toDF()

首先是StringIndexer

from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="gender", outputCol="gender_numeric").fit(df)
indexed_df = indexer.transform(df)
indexed_df.drop("bar").show()

## +------+----+--------------+
## |gender| foo|gender_numeric|
## +------+----+--------------+
## | 0| 3.0| 0.0|
## | 1| 1.0| 1.0|
## | 1|-1.0| 1.0|
## | 0|-3.0| 0.0|
## +------+----+--------------+

下一个OneHotEncoder:

from pyspark.ml.feature import OneHotEncoder

encoder = OneHotEncoder(inputCol="gender_numeric", outputCol="gender_vector")
encoded_df = encoder.transform(indexed_df)
encoded_df.drop("bar").show()

## +------+----+--------------+-------------+
## |gender| foo|gender_numeric|gender_vector|
## +------+----+--------------+-------------+
## | 0| 3.0| 0.0|(1,[0],[1.0])|
## | 1| 1.0| 1.0| (1,[],[])|
## | 1|-1.0| 1.0| (1,[],[])|
## | 0|-3.0| 0.0|(1,[0],[1.0])|
## +------+----+--------------+-------------+

VectorAssembler:

from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
inputCols=["gender_vector", "bar", "foo"], outputCol="features")

encoded_df_with_indexed_bar = (vector_indexer
.fit(encoded_df)
.transform(encoded_df))

final_df = assembler.transform(encoded_df)

如果 bar 包含分类变量,您可以使用 VectorIndexer 设置所需的元数据:

from pyspark.ml.feature import VectorIndexer

vector_indexer = VectorIndexer(inputCol="bar", outputCol="bar_indexed")

但这里不是这样。

最后,您可以使用管道来包装所有这些:

from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[indexer, encoder, vector_indexer, assembler])
model = pipeline.fit(df)
transformed = model.transform(df)

可以说,与从头开始编写所有内容相比,它更加健壮和简洁。有一些警告,尤其是当您需要在不同数据集之间进行一致编码时。您可以在 StringIndexerVectorIndexer 的官方文档中阅读更多内容。

获得可比较输出的另一种方法是 RFormula which :

RFormula produces a vector column of features and a double or string column of label. Like when formulas are used in R for linear regression, string input columns will be one-hot encoded, and numeric columns will be cast to doubles. If the label column is of type string, it will be first transformed to double with StringIndexer. If the label column does not exist in the DataFrame, the output label column will be created from the specified response variable in the formula.

from pyspark.ml.feature import RFormula

rf = RFormula(formula="~ gender + bar + foo - 1")
final_df_rf = rf.fit(df).transform(df)

如您所见,它更加简洁,但更难编写并不允许进行太多自定义。然而,像这样的简单管道的结果将是相同的:

final_df_rf.select("features").show(4, False)

## +----------------------+
## |features |
## +----------------------+
## |[1.0,0.0,2.1,1.0,3.0] |
## |[0.0,0.0,1.1,1.0,1.0] |
## |(5,[2,4],[3.4,-1.0]) |
## |[1.0,0.0,4.1,0.0,-3.0]|
## +----------------------+


final_df.select("features").show(4, False)

## +----------------------+
## |features |
## +----------------------+
## |[1.0,0.0,2.1,1.0,3.0] |
## |[0.0,0.0,1.1,1.0,1.0] |
## |(5,[2,4],[3.4,-1.0]) |
## |[1.0,0.0,4.1,0.0,-3.0]|
## +----------------------+

关于您的问题:

make a UDF with similar functionality that I can use in a Spark SQL query (or some other way, I suppose)

它只是一个像其他任何东西一样的 UDF。确保您使用受支持的类型,除此之外,一切都应该正常工作。

take the RDD resulting from the map described above and add it as a new column to the user_data dataframe?

from pyspark.ml.linalg import VectorUDT
from pyspark.sql.types import StructType, StructField

schema = StructType([StructField("features", VectorUDT(), True)])
row = Row("features")
result.map(lambda x: row(DenseVector(x))).toDF(schema)

注意:

对于 Spark 1.x,将 pyspark.ml.linalg 替换为 pyspark.mllib.linalg

关于python - 在 PySpark 中编码和组装多个功能,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32982425/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com