gpt4 book ai didi

python - 使用 Spark DataFrames 对多个字符串分类特征进行一次性编码

转载 作者:行者123 更新时间:2023-12-03 14:48:15 26 4
gpt4 key购买 nike

我的目标是使用 Spark DataFrame 对分类列列表进行一次性编码。例如,与 get_dummies() 相同函数在 Pandas 中执行.

数据集,bureau.csv最初取自 Kaggle 比赛 Home Credit Default Risk .这是我的条目表示例,比如 entryData , 哪里过滤哪里只有 KEY = 100001 .

# primary key
KEY = 'SK_ID_CURR'
data = spark.read.csv("bureau.csv", header=True, inferSchema=True)
# sample data from bureau.csv of 1716428 rows
entryData = data.select(columnList).where(F.col(KEY) == 100001).show()
print(entryData)
+----------+-------------+---------------+---------------+
|SK_ID_CURR|CREDIT_ACTIVE|CREDIT_CURRENCY| CREDIT_TYPE|
+----------+-------------+---------------+---------------+
| 100001| Closed| currency 1|Consumer credit|
| 100001| Closed| currency 1|Consumer credit|
| 100001| Closed| currency 1|Consumer credit|
| 100001| Closed| currency 1|Consumer credit|
| 100001| Active| currency 1|Consumer credit|
| 100001| Active| currency 1|Consumer credit|
| 100001| Active| currency 1|Consumer credit|
+----------+-------------+---------------+---------------+

我希望对列表进行单热编码 columnList通过创建函数 catg_encode(entryData, columnList) ,

columnList = cols_type(entryData, obj=True)[1:]
print(columnList)
['CREDIT_ACTIVE', 'CREDIT_CURRENCY', 'CREDIT_TYPE']

备注 cols_type()是一个函数,它返回分类列(如果 obj=True )或数字列(如果 obj=False )的列列表。

我已经成功编码第一列 'CREDIT_ACTIVE'但我不能同时为孔列,我的意思是构建函数 catg_encode .

# import necessary modules
from pyspark.sql import functions as F

# look for all distinct categoris within a given feature (here 'CREDIT_ACTIVE')
categories = entryData.select(columnList[0]).distinct().rdd.flatMap(lambda x: x).collect()
# one-hot encode the categories
exprs = [F.when(F.col(columnList[0]) == category, 1).otherwise(0).alias(category) for category in categories]
# nice table with encoded feature 'CREDIT_ACTIVE'
oneHotEncode = entryData.select(KEY, *exprs)
print(oneHotEncode)
+----------+--------+----+------+------+
|SK_ID_CURR|Bad debt|Sold|Active|Closed|
+----------+--------+----+------+------+
| 100001| 0| 0| 0| 1|
| 100001| 0| 0| 0| 1|
| 100001| 0| 0| 0| 1|
| 100001| 0| 0| 0| 1|
| 100001| 0| 0| 1| 0|
| 100001| 0| 0| 1| 0|
| 100001| 0| 0| 1| 0|
+----------+--------+----+------+------+

这里的功能 'CREDIT_ACTIVE'有4个不同的类别; ['Bad debt', 'Sold', 'Active', 'Closed'] .

备注 我什至尝试过 IndexToString OneHotEncoderEstimator 但对这项特定任务没有帮助。

我期待有以下输出,
+----------+--------+----+------+------+----------+----------+----------+----------+----------+---
|SK_ID_CURR|Bad debt|Sold|Active|Closed|currency 1|currency 2|currency 3|currency 4|..........|...
+----------+--------+----+------+------+----------+----------+----------+----------+----------+---
| 100001| 0| 0| 0| 1| 1| 0| 0| 0| ..|
| 100001| 0| 0| 0| 1| 1| 0| 0| 0| ..|
| 100001| 0| 0| 0| 1| 1| 0| 0| 0| ..|
| 100001| 0| 0| 0| 1| 1| 0| 0| 0| ..|
| 100001| 0| 0| 1| 0| 1| 0| 0| 0| ..|
| 100001| 0| 0| 1| 0| 1| 0| 0| 0| ..|
| 100001| 0| 0| 1| 0| 1| 0| 0| 0| ..|
+----------+--------+----+------+------+----------+----------+----------+----------+----------+---

连续点 ...用于特征的其余类别 'CREDIT_TYPE'哪个是
['Loan for the purchase of equipment', 'Cash loan (non-earmarked)', 'Microloan', 'Consumer credit', 'Mobile operator loan', 'Another type of loan', 'Mortgage', 'Interbank credit', 'Loan for working capital replenishment', 'Car loan', 'Real estate loan', 'Unknown type of loan', 'Loan for business development', 'Credit card', 'Loan for purchase of shares (margin lending)'] .

备注 : 我看过这个帖子 E-num / get Dummies in pyspark但在大数据的情况下,不会自动处理许多列的过程。该帖子提供了为每个分类特征编写单独代码的解决方案,这不是我的案例问题。

最佳答案

有两种方法可以给这种特殊的柠檬榨汁。让我们来看看它们。

  • 枢轴连接

  • import pyspark.sql.functions as f

    df1 = spark._sc.parallelize([
    [100001, 'Closed', 'currency 1', 'Consumer credit'],
    [100001, 'Closed', 'currency 1', 'Consumer credit'],
    [100001, 'Closed', 'currency 1', 'Consumer credit'],
    [100001, 'Closed', 'currency 1', 'Consumer credit'],
    [100001, 'Active', 'currency 1', 'Consumer credit'],
    [100001, 'Active', 'currency 1', 'Consumer credit'],
    [100001, 'Active', 'currency 1', 'Consumer credit'],
    [100002, 'Active', 'currency 2', 'Consumer credit'],
    ]).toDF(['SK_ID_CURR', 'CREDIT_ACTIVE', 'CREDIT_CURRENCY', 'CREDIT_TYPE'])

    # this can be done dynamically, but I don't have all categories
    categories = ['Active', 'Closed', 'Bad debt', 'Sold']

    # we need to pivot without aggregation, so I need to add an `id` column and group by it as well
    credit_groups = (
    df1.withColumn('id', f.monotonically_increasing_id())
    .groupBy('SK_ID_CURR', 'id')
    .pivot('CREDIT_ACTIVE', values=categories)
    .agg(f.lit(1))
    .drop('id')
    )

    # currency groups are just a 1 for each currency and ID, as per the example data
    # if this is not the case, something more clever needs to be here
    currency_groups = df1.groupBy('SK_ID_CURR').pivot('CREDIT_CURRENCY').agg(f.lit(1))

    # join the two pivoted tables on the ID and fill nulls to zeroes
    credit_groups.join(currency_groups, on=['SK_ID_CURR'], how='inner').na.fill(0).show()

    +----------+------+------+--------+----+----------+----------+
    |SK_ID_CURR|Active|Closed|Bad debt|Sold|currency 1|currency 2|
    +----------+------+------+--------+----+----------+----------+
    | 100002| 1| 0| 0| 0| 0| 1|
    | 100001| 0| 1| 0| 0| 1| 0|
    | 100001| 1| 0| 0| 0| 1| 0|
    | 100001| 1| 0| 0| 0| 1| 0|
    | 100001| 0| 1| 0| 0| 1| 0|
    | 100001| 0| 1| 0| 0| 1| 0|
    | 100001| 1| 0| 0| 0| 1| 0|
    | 100001| 0| 1| 0| 0| 1| 0|
    +----------+------+------+--------+----+----------+----------+
  • 使用 StringIndexerOneHotEncoderEstimator例如:
  • from pyspark.ml import Pipeline
    from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer

    indexers = [StringIndexer(inputCol=column, outputCol=column+"_NUMERIC").fit(df1) for column in ['CREDIT_ACTIVE', 'CREDIT_CURRENCY']]

    pipeline = Pipeline(stages=indexers)
    df_indexed = pipeline.fit(df1).transform(df1)
    df_indexed.show()

    +----------+-------------+---------------+---------------+---------------------+-----------------------+
    |SK_ID_CURR|CREDIT_ACTIVE|CREDIT_CURRENCY| CREDIT_TYPE|CREDIT_ACTIVE_NUMERIC|CREDIT_CURRENCY_NUMERIC|
    +----------+-------------+---------------+---------------+---------------------+-----------------------+
    | 100001| Closed| currency 1|Consumer credit| 0.0| 0.0|
    | 100001| Closed| currency 1|Consumer credit| 0.0| 0.0|
    | 100001| Closed| currency 1|Consumer credit| 0.0| 0.0|
    | 100001| Closed| currency 1|Consumer credit| 0.0| 0.0|
    | 100001| Active| currency 1|Consumer credit| 1.0| 0.0|
    | 100001| Active| currency 1|Consumer credit| 1.0| 0.0|
    | 100001| Active| currency 1|Consumer credit| 1.0| 0.0|
    | 100002| Active| currency 2|Consumer credit| 1.0| 1.0|
    +----------+-------------+---------------+---------------+---------------------+-----------------------+

    从现在开始,您可以在新创建的数字列上使用 one-hot 编码。我个人推荐路线 1,因为它更具可读性。但是,路线 2 允许您链接 OneHotEncoderEstimator进入申报 Pipeline同样,使代码从声明后的一行开始可执行。希望这可以帮助。

    关于python - 使用 Spark DataFrames 对多个字符串分类特征进行一次性编码,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58995226/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com