gpt4 book ai didi

java - 无法理解 Spark 中的 UDF,尤其是 Java 中的 UDF

转载 作者:行者123 更新时间:2023-12-02 03:01:47 29 4
gpt4 key购买 nike

我正在尝试根据另一列的值在 Spark 数据集中创建一个新列。另一列的值作为键在 json 文件中搜索,返回的值是用于新列的值。

这是我尝试过的代码,但它不起作用,而且我不确定 UDF 是如何工作的。在这种情况下,如何使用 withColumn 或 udf 添加列?

Dataset<Row> df = spark.read().format("csv").option("header", "true").load("file path");
Object obj = new JSONParser().parse(new FileReader("json path"));
JSONObject jo = (JSONObject) obj;

df = df.withColumn("cluster", functions.lit(jo.get(df.col("existing col_name")))));

任何帮助将不胜感激。提前致谢!

最佳答案

Spark 允许您使用 udf 函数创建自定义用户定义函数 (UDF)。

以下是如何定义 UDF 的 scala 片段。

val obj = new JSONParser().parse(new FileReader("json path"));
val jo = obj.asInstanceOf[JSONObject];

def getJSONObject(key: String) = {
jo.get(key)
}

定义函数后,您可以将其转换为 UDF,如下所示:

 val getObject = udf(getJSONObject _)

有两种使用 UDF 的方法。

  1. df.withColumn("cluster", lit(getObject(col("existing_col_name"))))

  2. 如果您使用spark sql,则必须在使用之前在sqlContext中注册您的udf。

    spark.sqlContext.udf.register("get_object", getJSONObject _)

    然后您可以将其用作

    spark.sql("从 some_table 中选择 get_object(existing_column)")

其中,使用哪一个完全是主观的。

关于java - 无法理解 Spark 中的 UDF,尤其是 Java 中的 UDF,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52711450/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com