gpt4 book ai didi

scala - 如何向 DataFrame 添加新的 Struct 列

转载 作者:行者123 更新时间:2023-11-29 02:43:49 25 4
gpt4 key购买 nike

我目前正在尝试从 MongoDB 中提取数据库,并使用 Spark 通过 geo_points 摄取到 ElasticSearch。

Mongo 数据库有纬度和经度值,但 ElasticSearch 要求将它们转换为 geo_point 类型。

Spark 中是否有一种方法可以将 latlon 列复制到一个新列,该列是 arraystruct ?

感谢任何帮助!

最佳答案

我假设您从像这样的某种平面模式开始:

root
|-- lat: double (nullable = false)
|-- long: double (nullable = false)
|-- key: string (nullable = false)

首先让我们创建示例数据:

import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.types._

val rdd = sc.parallelize(
Row(52.23, 21.01, "Warsaw") :: Row(42.30, 9.15, "Corte") :: Nil)

val schema = StructType(
StructField("lat", DoubleType, false) ::
StructField("long", DoubleType, false) ::
StructField("key", StringType, false) ::Nil)

val df = sqlContext.createDataFrame(rdd, schema)

一种简单的方法是使用 udf 和案例类:

case class Location(lat: Double, long: Double)
val makeLocation = udf((lat: Double, long: Double) => Location(lat, long))

val dfRes = df.
withColumn("location", makeLocation(col("lat"), col("long"))).
drop("lat").
drop("long")

dfRes.printSchema

然后我们得到

root
|-- key: string (nullable = false)
|-- location: struct (nullable = true)
| |-- lat: double (nullable = false)
| |-- long: double (nullable = false)

一个困难的方法是转换数据并在之后应用模式:

val rddRes = df.
map{case Row(lat, long, key) => Row(key, Row(lat, long))}

val schemaRes = StructType(
StructField("key", StringType, false) ::
StructField("location", StructType(
StructField("lat", DoubleType, false) ::
StructField("long", DoubleType, false) :: Nil
), true) :: Nil
)

sqlContext.createDataFrame(rddRes, schemaRes).show

我们得到了预期的输出

+------+-------------+
| key| location|
+------+-------------+
|Warsaw|[52.23,21.01]|
| Corte| [42.3,9.15]|
+------+-------------+

从头开始创建嵌套模式可能很乏味,所以如果可以的话,我会推荐第一种方法。如果您需要更复杂的结构,它可以很容易地扩展:

case class Pin(location: Location)
val makePin = udf((lat: Double, long: Double) => Pin(Location(lat, long))

df.
withColumn("pin", makePin(col("lat"), col("long"))).
drop("lat").
drop("long").
printSchema

我们得到了预期的输出:

root
|-- key: string (nullable = false)
|-- pin: struct (nullable = true)
| |-- location: struct (nullable = true)
| | |-- lat: double (nullable = false)
| | |-- long: double (nullable = false)

不幸的是,您无法控制 nullable 字段,因此如果对您的项目很重要,您必须指定架构。

终于可以使用1.4引入的struct函数了:

import org.apache.spark.sql.functions.struct

df.select($"key", struct($"lat", $"long").alias("location"))

关于scala - 如何向 DataFrame 添加新的 Struct 列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31615657/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com