gpt4 book ai didi

apache-spark - 向 Spark DataFrame 添加一列并为其计算值

转载 作者:行者123 更新时间:2023-12-04 05:24:55 26 4
gpt4 key购买 nike

我有一个 CSV 文档正在加载到包含纬度和经度列的 SQLContext 中。

val sqlContext = new org.apache.spark.sql.SQLContext(sc);
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "false").option("delimiter","\t").schema(customSchema).load(inputFile);

CSV 示例
metro_code, resolved_lat, resolved_lon
602, 40.7201, -73.2001

我试图找出添加新列并计算每一行的 GeoHex 的最佳方法。使用 geohex 包可以轻松散列纬度和经度。我想我需要运行 parallelize 方法,或者我已经看到一些将函数传递给 withColumn 的示例。

最佳答案

用 UDF 包装所需的函数应该可以解决问题:

import org.apache.spark.sql.functions.udf
import org.geohex.geohex4j.GeoHex

val df = sc.parallelize(Seq(
(Some(602), 40.7201, -73.2001), (None, 5.7805, 139.5703)
)).toDF("metro_code", "resolved_lat", "resolved_lon")

def geoEncode(level: Int) = udf(
(lat: Double, long: Double) => GeoHex.encode(lat, long, level))

df.withColumn("code", geoEncode(9)($"resolved_lat", $"resolved_lon")).show
// +----------+------------+------------+-----------+
// |metro_code|resolved_lat|resolved_lon| code|
// +----------+------------+------------+-----------+
// | 602| 40.7201| -73.2001|PF384076026|
// | null| 5.7805| 139.5703|PR081331784|
// +----------+------------+------------+-----------+

关于apache-spark - 向 Spark DataFrame 添加一列并为其计算值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34774543/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com