- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
今天早上我们将 Spark 版本从 2.2.0 更新到 2.3.0,我遇到了相当奇怪的问题。
我有一个 UDF(),计算 2 点之间的距离
private static UDF4<Double, Double, Double, Double, Double> calcDistance =
(UDF4<Double, Double, Double, Double, Double>) (lat, lon, meanLat, meanLon) ->
GeoUtils.calculateDistance(lat, lon, meanLat, meanLon);
UDF注册
spark.udf().register("calcDistance", calcDistance, DataTypes.DoubleType);
我有一个以下结构的数据框(这个 DF 是通过 hpan
字段连接 2 个 DF 的结果)
root
|-- hpan: string (nullable = true)
|-- atmid: string (nullable = true)
|-- reqamt: long (nullable = true)
|-- mcc_code: string (nullable = true)
|-- utime: string (nullable = true)
|-- udate: string (nullable = true)
|-- address_city: string (nullable = true)
|-- latitude: double (nullable = true)
|-- longitude: double (nullable = true)
|-- gmt_msk_offset: integer (nullable = true)
|-- utimeWithTZ: timestamp (nullable = true)
|-- weekDay: integer (nullable = true)
|-- location_type: string (nullable = true)
|-- mean_lat: double (nullable = true)
|-- mean_lon: double (nullable = true)
所以我想要的是添加一个距离在 (lat,lon) 和 (mean_lat,mean_lon) 之间的列;
svWithCoordsTzAndDistancesDF.withColumn("distance",
callUDF("calcDistance",col("latitude"), col("longitude"),
col("mean_lat"), col("mean_lon")));
它在 Spark 2.2 上运行良好,但在 v2.3 上开始失败异常(exception)的是
Exception in thread "main" org.apache.spark.sql.AnalysisException: Resolved attribute(s) 'mean_lon,'mean_lat,'longitude,'latitude missing from gmt_msk_offset#147,utime#3,longitude#146,addre
ss_city#141,udate#29,mean_lon#371,weekDay#230,reqamt#4L,latitude#145,mean_lat#369,location_type#243,hpan#1,utimeWithTZ#218,mcc_code#14,atmid#9 in operator 'Project [hpan#1, atmid#9, reqamt# 4L, mcc_code#14, utime#3, udate#29, address_city#141, latitude#145, longitude#146, gmt_msk_offset#147, utimeWithTZ#218, weekDay#230, location_type#243, mean_lat#369, mean_lon#371, 'calcDist
ance('latitude, 'longitude, 'mean_lat, 'mean_lon) AS distance#509]. Attribute(s) with the same name appear in the operation: mean_lon,mean_lat,longitude,latitude. Please check if the right
attribute(s) are used.;;
我尝试像这样在 UDF() 内的 cols 添加别名
svWithCoordsTzAndDistancesDF.withColumn("distance",
callUDF("calcDistance",col("latitude").as("a"), col("longitude").as("b"), col("mean_lat").as("c"), col("mean_lon").as("d")));
或者将此列包装在 scala 序列中
svWithCoordsTzAndDistancesDF.withColumn("distance",
callUDF("calcDistance",JavaConverters.collectionAsScalaIterableConverter(Arrays.asList
(col("latitude"), col("longitude"), col("mean_lat"), col("mean_lon")))
.asScala()
.toSeq()));
这些尝试都不能解决问题。
也许有人知道这个问题的解决方法?
转换流程是这样的
ParentDF -> childDF1(as parentDF.groupBy().agg(mean())), childDF2(parentDF.filter('condition')) -> svWithCoordsTzAndDistancesDF (join childDF1 and childDF2).
我认为问题可能出在为此流程构建的执行计划中......
最佳答案
这是某种魔法。当我指定列的数据框并添加 select("*")
- 它起作用了。如果有人能解释一下 - 我将非常感激
df = df.select("*")
.withColumn("distance", callUDF("calcDistance",
df.col("latitude"),
df.col("longitude"),
df.col("mean_lat"),
df.col("mean_lon")))
.toDF();
关于java - withColumn() 内的 AnalysisException callUDF(),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50345387/
我有一个数据框(mydf),如下所示: +---+---+---+---+ | F1| F2| F3| F4| +---+---+---+---+ | t| y4| 5|1.0| | x| y
我在spark中写了一些代码如下: val df = sqlContext.read.json("s3n://blah/blah.gz").repartition(200) val newdf = d
我有一个包含 2 列的数据框:account_id 和 email_address,现在我想再添加一列 updated_email_address,我称之为email_address 上的函数以获取
我注意到我的代码存储库警告我在 for/while 循环中使用 withColumn 是一种反模式。为什么不推荐这样做?这不是PySpark API的正常使用吗? 最佳答案 我们在实践中注意到,在 f
在我使用 RDD 进行了几个项目之后,我开始使用数据集。我正在使用 Java 进行开发。 据我了解,列是不可变的 - 列没有映射函数,映射列的标准方法是使用 withColumn 添加列。 我的问题是
这个问题已经有答案了: Multiple condition filter on dataframe (2 个回答) 已关闭 3 年前。 我是 Pyspark 新手 我有这段代码: df2 = df.
我有一个 df,其中包含一列 type,并且我有两个列表 women = ['0980981', '0987098'] men = ['1234567', '4567854'] 现在我想根据 type
我正在为某些要求创建一个空数据框,当我在其上调用 withColumn 函数时,我得到了列,但数据为空,如下所示- schema = StructType([]) df = sqlContext.cr
我有一个包含列“col1”和“col2”的数据框 df。我想创建第三列,它使用其中一列作为指数函数。 df = df.withColumn("col3", 100**(df("col1")))*df(
我有一些使用 的原型(prototype) Scala 代码 .withColumn("column_name_dod", $"column_name".getItem("dod")) 我知道with
如何在多个 when 条件下实现以下目标。 from pyspark.sql import functions as F df = spark.createDataFrame([(5000, 'US'
当多个 withColumn 时,Spark 是执行一次还是多次传递数据?函数是链式的? 例如: val dfnew = df.withColumn("newCol1", f1(col("a")))
我正在使用 Spark 和 PySpark。我正在尝试实现等效于以下伪代码的结果: df = df.withColumn('new_column', IF fruit1 == fruit2 T
我有一个 DataFrame,它有多个列,其中一些是结构。像这样的事情 root |-- foo: struct (nullable = true) | |-- bar: string (n
今天早上我们将 Spark 版本从 2.2.0 更新到 2.3.0,我遇到了相当奇怪的问题。 我有一个 UDF(),计算 2 点之间的距离 private static UDF4 calcDistan
我有一个用户定义函数的问题,该函数是为连接来自一个数据帧的值而构建的,该数据帧与来自另一个数据帧的索引值相匹配。 以下是我尝试匹配的简化数据框: a_df: +-------+------+ | in
我有这样一个数据集: a = sc.parallelize([[1,2,3],[0,2,1],[9,8,7]]).toDF(["one", "two", "three"]) 我想要一个数据集,它添加一
我有一个具有以下结构的 Spark 数据框。 bodyText_token 具有标记(已处理/单词集)。我有一个定义关键字的嵌套列表 root |-- id: string (nullable =
如果我有一个名为 df 的 DataFrame,它看起来像: +---+---+ | a1+ a2| +---+---+ |foo|bar| |N/A|baz| +---+---+ 我期望: val
我使用的是 UCI 的成人年收入。 我有一个数据框,其中一列中有一个类别变量,我想将其分组为不同的类别(一些常见的特征工程)。 df.groupBy('education').count().show
我是一名优秀的程序员,十分优秀!