gpt4 book ai didi

apache-spark - PySpark,DataFrame 的顶部

转载 作者:行者123 更新时间:2023-12-04 04:37:29 26 4
gpt4 key购买 nike

我想要做的是给定一个 DataFrame,根据某个指定的列取前 n 个元素。 RDD API 中的 top(self, num) 正是我想要的。我想知道 DataFrame 世界中是否有等效的 API?

我的第一次尝试如下

def retrieve_top_n(df, n):
# assume we want to get most popular n 'key' in DataFrame
return df.groupBy('key').count().orderBy('count', ascending=False).limit(n).select('key')

但是,我已经意识到这会导致不确定的行为(我不知道确切的原因,但我猜 limit(n) 并不能保证采用哪个 n)

最佳答案

首先让我们定义一个函数来生成测试数据:

import numpy as np

def sample_df(num_records):
def data():
np.random.seed(42)
while True:
yield int(np.random.normal(100., 80.))

data_iter = iter(data())
df = sc.parallelize((
(i, next(data_iter)) for i in range(int(num_records))
)).toDF(('index', 'key_col'))

return df

sample_df(1e3).show(n=5)
+-----+-------+
|index|key_col|
+-----+-------+
| 0| 139|
| 1| 88|
| 2| 151|
| 3| 221|
| 4| 81|
+-----+-------+
only showing top 5 rows

现在,让我们提出三种不同的计算 TopK 的方法:
from pyspark.sql import Window
from pyspark.sql import functions


def top_df_0(df, key_col, K):
"""
Using window functions. Handles ties OK.
"""
window = Window.orderBy(functions.col(key_col).desc())
return (df
.withColumn("rank", functions.rank().over(window))
.filter(functions.col('rank') <= K)
.drop('rank'))


def top_df_1(df, key_col, K):
"""
Using limit(K). Does NOT handle ties appropriately.
"""
return df.orderBy(functions.col(key_col).desc()).limit(K)


def top_df_2(df, key_col, K):
"""
Using limit(k) and then filtering. Handles ties OK."
"""
num_records = df.count()
value_at_k_rank = (df
.orderBy(functions.col(key_col).desc())
.limit(k)
.select(functions.min(key_col).alias('min'))
.first()['min'])

return df.filter(df[key_col] >= value_at_k_rank)

函数名为 top_df_1类似于您最初实现的那个。它给你非确定性行为的原因是它不能很好地处理关系。如果您有大量数据并且只对近似答案感兴趣以提高性能,这可能是一件好事。

最后,让我们进行基准测试

对于基准测试,使用具有 400 万个条目的 Spark DF 并定义一个便利函数:
NUM_RECORDS = 4e6
test_df = sample_df(NUM_RECORDS).cache()

def show(func, df, key_col, K):
func(df, key_col, K).select(
functions.max(key_col),
functions.min(key_col),
functions.count(key_col)
).show()

让我们看看判决:
%timeit show(top_df_0, test_df, "key_col", K=100)
+------------+------------+--------------+
|max(key_col)|min(key_col)|count(key_col)|
+------------+------------+--------------+
| 502| 420| 108|
+------------+------------+--------------+

1 loops, best of 3: 1.62 s per loop


%timeit show(top_df_1, test_df, "key_col", K=100)
+------------+------------+--------------+
|max(key_col)|min(key_col)|count(key_col)|
+------------+------------+--------------+
| 502| 420| 100|
+------------+------------+--------------+

1 loops, best of 3: 252 ms per loop


%timeit show(top_df_2, test_df, "key_col", K=100)
+------------+------------+--------------+
|max(key_col)|min(key_col)|count(key_col)|
+------------+------------+--------------+
| 502| 420| 108|
+------------+------------+--------------+

1 loops, best of 3: 725 ms per loop

(请注意, top_df_0top_df_2 在前 100 名中有 108 个条目。这是由于存在第 100 名的并列条目。 top_df_1 实现忽略了并列条目。)。

底线

如果您想得到准确的答案,请使用 top_df_2 (它比 top_df_0 好大约 2 倍)。如果您想要另一个 x2 的性能并且可以接受近似答案,请使用 top_df_1 .

关于apache-spark - PySpark,DataFrame 的顶部,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46008057/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com