gpt4 book ai didi

pandas - 使 groupby.apply 更高效或转换为 spark

转载 作者:行者123 更新时间:2023-12-04 10:15:27 25 4
gpt4 key购买 nike

全部,

我正在使用 pandas groupby.apply 来使用我自己的自定义函数。但是,我注意到该功能非常非常慢。有人可以帮我转换此代码以应用于 spark 数据帧吗?

添加简单的例子供人们使用:

import pandas as pd
import operator

df = pd.DataFrame({
'Instruments': ['A', 'B', 'A', 'B', 'A', 'C', 'C', 'B'],
'Sers': ['Wind', 'Tool', 'Wind', 'Wind', 'Tool', 'Tool', 'Tool', 'Wind'],
'Sounds': [42, 21, 34, 56, 43, 61, 24, 23]
})
def get_stats(data_frame):

# For each grouped data_frame, cutoff all Sounds greater than 99th percentile
cutoff_99 = data_frame[data_frame.Sounds <= data_frame.Sounds.quantile(.99)]

# Based on total number of records, select the most-abundant sers
sers_to_use = max((cutoff_99.Sers.value_counts() / cutoff_99.shape[0]).to_dict().items(), key = operator.itemgetter(1))[0]

# Give me the average sound of the selected sers
avg_sounds_of_sers_to_use = cutoff_99.loc[cutoff_99["Sers"] == sers_to_use].Sounds.mean()

# Pre-allocate lists
cool = []
mean_sounds = []
ratios = []
_difference = []


for i in cutoff_99.Sers.unique():
# add each unique sers of that dataframe
cool.append(i)

# get the mean sound of that ser
sers_mean_sounds = (cutoff_99.loc[cutoff_99["Sers"] == i].Sounds).mean()

# add each mean sound for each sers
mean_sounds.append(sers_mean_sounds)

# get the ratio of the sers to use vs. the current sers; add all of the ratios to the list
ratios.append(avg_sounds_of_sers_to_use / sers_mean_sounds)

# get the percent difference and add it to a list
_difference.append(
float(
round(
abs(avg_sounds_of_sers_to_use - sers_mean_sounds)
/ ((avg_sounds_of_sers_to_use + sers_mean_sounds) / 2),
2,
)
* 100
)
)

# return a series with these lists/values.
return pd.Series({
'Cools': cool,
'Chosen_Sers': sers_to_use,
'Average_Sounds_99_Percent': mean_sounds,
'Mean_Ratios': ratios,
'Percent_Differences': _difference
})

我在pandas中调用函数如下:df.groupby('仪器').apply(get_stats)

最佳答案

您可以使用 pyspark 和 window 实现一切功能如下图所示。

创建 pyspark 数据框:

import pandas as pd

data = {
'Instruments': ['A', 'B', 'A', 'B', 'A', 'C', 'C', 'B'],
'Sers': ['Wind', 'Tool', 'Wind', 'Wind', 'Tool', 'Tool', 'Tool', 'Wind'],
'Sounds': [42, 21, 34, 56, 43, 61, 24, 23]
}

pddf = pd.DataFrame(data)

df = spark.createDataFrame(pddf)
df.show()

输出:

+-----------+----+------+
|Instruments|Sers|Sounds|
+-----------+----+------+
| A|Wind| 42|
| B|Tool| 21|
| A|Wind| 34|
| B|Wind| 56|
| A|Tool| 43|
| C|Tool| 61|
| C|Tool| 24|
| B|Wind| 23|
+-----------+----+------+

计算:

from pyspark.sql import Window
from pyspark.sql import functions as F

wI = Window.partitionBy('Instruments')
wIS = Window.partitionBy('Instruments', 'Sers')

df = df.withColumn('q', F.expr('percentile_approx(Sounds, 0.99)').over(wI))
df = df.filter(df.Sounds < df.q)

#this is our marker for Chosen_Sers
#a higher number indicates that this is the most-abundant sers
df = df.withColumn('tmpCount', F.count('Sers').over(wIS))
#this is the most-abundant sers as string
df = df.withColumn('Chosen_Sers', F.first('Sers').over(wI.orderBy(F.desc('tmpCount'))))

#mean sound for each sers within a instrument
df = df.withColumn('Average_Sounds_99_Percent', F.mean('Sounds').over(wIS))
#mean sound of the chosen sers
df = df.withColumn('avg_sounds_of_sers_to_use', F.first(F.col('Average_Sounds_99_Percent')).over(wI.orderBy(F.desc('tmpCount'))))

df = df.withColumn('mean_ratios', F.col('avg_sounds_of_sers_to_use')/F.mean('Sounds').over(wIS))

df = df.withColumn('percent_differences', 100 * F.round(F.abs(F.col('avg_sounds_of_sers_to_use') - F.col('Average_Sounds_99_Percent'))/ ((F.col('avg_sounds_of_sers_to_use') + F.col('Average_Sounds_99_Percent'))/2),2))

#until now we flat table
df.show()

#now we create the desired structure and drop all unneeded columns
df.dropDuplicates(['Instruments','Sers']).groupby('Instruments', 'Chosen_Sers').agg(F.collect_list('Sers').alias('Sers')
, F.collect_list('Average_Sounds_99_Percent').alias('Average_Sounds_99_Percent')
, F.collect_list('mean_ratios').alias('mean_ratios')
, F.collect_list('percent_differences').alias('percent_differences')
).show(truncate=False)

输出:

#just a flat table
+-----------+----+------+---+--------+-----------+-------------------------+-------------------------+------------------+-------------------+
|Instruments|Sers|Sounds| q|tmpCount|Chosen_Sers|Average_Sounds_99_Percent|avg_sounds_of_sers_to_use| mean_ratios|percent_differences|
+-----------+----+------+---+--------+-----------+-------------------------+-------------------------+------------------+-------------------+
| B|Tool| 21| 56| 1| Tool| 21.0| 21.0| 1.0| 0.0|
| B|Wind| 23| 56| 1| Tool| 23.0| 21.0|1.0952380952380953| 9.0|
| C|Tool| 24| 61| 1| Tool| 24.0| 24.0| 1.0| 0.0|
| A|Wind| 42| 43| 2| Wind| 38.0| 38.0| 1.0| 0.0|
| A|Wind| 34| 43| 2| Wind| 38.0| 38.0| 1.0| 0.0|
+-----------+----+------+---+--------+-----------+-------------------------+-------------------------+------------------+-------------------+
#desired structure
+-----------+-----------+------------+-------------------------+-------------------------+-------------------+
|Instruments|Chosen_Sers|Sers |Average_Sounds_99_Percent|mean_ratios |percent_differences|
+-----------+-----------+------------+-------------------------+-------------------------+-------------------+
|B |Tool |[Tool, Wind]|[21.0, 23.0] |[1.0, 0.9130434782608695]|[0.0, 9.0] |
|C |Tool |[Tool] |[24.0] |[1.0] |[0.0] |
|A |Wind |[Wind] |[38.0] |[1.0] |[0.0] |
+-----------+-----------+------------+-------------------------+-------------------------+-------------------+

关于pandas - 使 groupby.apply 更高效或转换为 spark,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61083987/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com