gpt4 book ai didi

java - withColumn() 内的 AnalysisException callUDF()

转载 作者:太空宇宙 更新时间:2023-11-04 10:27:43 25 4
gpt4 key购买 nike

今天早上我们将 Spark 版本从 2.2.0 更新到 2.3.0,我遇到了相当奇怪的问题。

我有一个 UDF(),计算 2 点之间的距离

private static UDF4<Double, Double, Double, Double, Double> calcDistance =
(UDF4<Double, Double, Double, Double, Double>) (lat, lon, meanLat, meanLon) ->
GeoUtils.calculateDistance(lat, lon, meanLat, meanLon);

UDF注册

spark.udf().register("calcDistance", calcDistance, DataTypes.DoubleType);

我有一个以下结构的数据框(这个 DF 是通过 hpan 字段连接 2 个 DF 的结果)

root
|-- hpan: string (nullable = true)
|-- atmid: string (nullable = true)
|-- reqamt: long (nullable = true)
|-- mcc_code: string (nullable = true)
|-- utime: string (nullable = true)
|-- udate: string (nullable = true)
|-- address_city: string (nullable = true)
|-- latitude: double (nullable = true)
|-- longitude: double (nullable = true)
|-- gmt_msk_offset: integer (nullable = true)
|-- utimeWithTZ: timestamp (nullable = true)
|-- weekDay: integer (nullable = true)
|-- location_type: string (nullable = true)
|-- mean_lat: double (nullable = true)
|-- mean_lon: double (nullable = true)

所以我想要的是添加一个距离在 (lat,lon) 和 (mean_lat,mean_lon) 之间的列;

svWithCoordsTzAndDistancesDF.withColumn("distance",
callUDF("calcDistance",col("latitude"), col("longitude"),
col("mean_lat"), col("mean_lon")));

它在 Spark 2.2 上运行良好,但在 v2.3 上开始失败异常(exception)的是

Exception in thread "main" org.apache.spark.sql.AnalysisException: Resolved attribute(s) 'mean_lon,'mean_lat,'longitude,'latitude missing from gmt_msk_offset#147,utime#3,longitude#146,addre
ss_city#141,udate#29,mean_lon#371,weekDay#230,reqamt#4L,latitude#145,mean_lat#369,location_type#243,hpan#1,utimeWithTZ#218,mcc_code#14,atmid#9 in operator 'Project [hpan#1, atmid#9, reqamt# 4L, mcc_code#14, utime#3, udate#29, address_city#141, latitude#145, longitude#146, gmt_msk_offset#147, utimeWithTZ#218, weekDay#230, location_type#243, mean_lat#369, mean_lon#371, 'calcDist
ance('latitude, 'longitude, 'mean_lat, 'mean_lon) AS distance#509]. Attribute(s) with the same name appear in the operation: mean_lon,mean_lat,longitude,latitude. Please check if the right
attribute(s) are used.;;

我尝试像这样在 UDF() 内的 cols 添加别名

svWithCoordsTzAndDistancesDF.withColumn("distance",
callUDF("calcDistance",col("latitude").as("a"), col("longitude").as("b"), col("mean_lat").as("c"), col("mean_lon").as("d")));

或者将此列包装在 scala 序列中

svWithCoordsTzAndDistancesDF.withColumn("distance",
callUDF("calcDistance",JavaConverters.collectionAsScalaIterableConverter(Arrays.asList
(col("latitude"), col("longitude"), col("mean_lat"), col("mean_lon")))
.asScala()
.toSeq()));

这些尝试都不能解决问题。

也许有人知道这个问题的解决方法?

转换流程是这样的

ParentDF -> childDF1(as parentDF.groupBy().agg(mean())), childDF2(parentDF.filter('condition')) -> svWithCoordsTzAndDistancesDF (join childDF1 and childDF2). 

我认为问题可能出在为此流程构建的执行计划中......

最佳答案

这是某种魔法。当我指定列的数据框并添加 select("*") - 它起作用了。如果有人能解释一下 - 我将非常感激

df = df.select("*")
.withColumn("distance", callUDF("calcDistance",
df.col("latitude"),
df.col("longitude"),
df.col("mean_lat"),
df.col("mean_lon")))
.toDF();

关于java - withColumn() 内的 AnalysisException callUDF(),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50345387/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com