gpt4 book ai didi

Pyspark Dataframe 将函数应用于两列

转载 作者:行者123 更新时间:2023-12-01 17:50:09 24 4
gpt4 key购买 nike

假设我有两个 PySpark DataFrame df1df2

df1=   'a' 
1
2
5

df2= 'b'
3
6

我想找到最接近的df2['b']每个值 df1['a'] ,并将最接近的值添加为 df1 中的新列.

换句话说,对于每个值 xdf1['a'] ,我想找一个y实现 min(abx(x-y))对于所有人y in df2['b'] (注:可以假设只有一个 y 可以达到最小距离),结果为

'a'    'b'
1 3
2 3
5 6

我尝试使用以下代码首先创建距离矩阵(在找到实现最小距离的值之前):

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

def dict(x,y):
return abs(x-y)
udf_dict = udf(dict, IntegerType())

sql_sc = SQLContext(sc)
udf_dict(df1.a, df2.b)

这给出了

Column<PythonUDF#dist(a,b)>

然后我尝试了

sql_sc.CreateDataFrame(udf_dict(df1.a, df2.b))

它永远运行而不给出错误/输出。

我的问题是:

  1. 由于我是 Spark 新手,我构造输出 DataFrame 的方法有效吗? (我的方法是首先为所有 ab 值创建一个距离矩阵,然后找到 min 一个)
  2. 我的代码的最后一行有什么问题以及如何修复它?

最佳答案

从你的第二个问题开始 - 你只能将 udf 应用于现有的数据帧,我认为你正在考虑这样的事情:

>>> df1.join(df2).withColumn('distance', udf_dict(df1.a, df2.b)).show()
+---+---+--------+
| a| b|distance|
+---+---+--------+
| 1| 3| 2|
| 1| 6| 5|
| 2| 3| 1|
| 2| 6| 4|
| 5| 3| 2|
| 5| 6| 1|
+---+---+--------+

但是有一种更有效的方法来应用此距离,即使用内部 abs:

>>> from pyspark.sql.functions import abs
>>> df1.join(df2).withColumn('distance', abs(df1.a -df2.b))

然后你可以通过计算找到匹配的数字:

>>> distances = df1.join(df2).withColumn('distance', abs(df1.a -df2.b))
>>> min_distances = distances.groupBy('a').agg(min('distance').alias('distance'))
>>> distances.join(min_distances, ['a', 'distance']).select('a', 'b').show()
+---+---+
| a| b|
+---+---+
| 5| 6|
| 1| 3|
| 2| 3|
+---+---+

关于Pyspark Dataframe 将函数应用于两列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40389433/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com