gpt4 book ai didi

java - 如何在不指定每一列的情况下将整行作为参数传递给 Spark (Java) 中的 UDF?

转载 作者:行者123 更新时间:2023-12-05 07:42:54 25 4
gpt4 key购买 nike

我有这段 java 代码,其中 spark UDF 将行作为输入并返回行。还有一个广播变量,它是一个 HashMap。

UDF 所做的只是检查广播 HashMap 是否包含 rowKey,如果包含,则返回一个新行,其中包含输入行中的一些现有值和广播 HashMap 中的一些更新值。如果不是,则按原样返回输入行。我这样做是因为我想根据 HashMap 中的值更新行列值。这是代码:

广播变量

final Broadcast<HashMap<String, HashMap<String, String>>> broadcastVariable = jsc.broadcast(someHashMap);

UDF 定义

UDF1<Row, Row> myUDF = new UDF1<Row, Row> () {
@Override
public Row call(Row inputRow) {

String myKey = inputRow.getString(3);

if (broadcastVariable.value().containsKey(myKey)){
Map<String, String> valuesToUpdate = broadcastVariable.value().get(myKey);

String col1 = inputRow.getString(0);
String col2 = inputRow.getString(1);
String col3 = inputRow.getString(2);

for (Map.Entry<String, String> entry : valuesToUpdate.entrySet())
{
String columnName = entry.getKey();

switch(columnName) {
case "col1" :
col1 = entry.getValue();
break;
case "col2" :
col2 = entry.getValue();
break;
case "col3" :
col3 = entry.getValue();
break;
}
}
return RowFactory.create(col1,col2,col3,myKey);

}
return inputRow;
}
};

UDF注册

hiveContext.udf().register("myUDF", myUDF, DataTypes.createStructType(DF1.schema().fields()));

UDF调用

DataFrame DF2 = DF1.select(org.apache.spark.sql.functions.callUDF
("myUDF", org.apache.spark.sql.functions.struct(DF1.col("col1"),
DF1.col("col2"),
DF1.col("col3"),
DF1.col("myKey"))));

我有以下问题,

  1. 如何在不一一列出的情况下将数据框中的所有列传递给 UDF?我问这个的原因是实际的 DataFrame 有超过 50 列。我看到这个example ,但无法让它在 Java 中运行。

  2. 有没有一种方法可以在 UDF 中按名称访问行列?现在我正在使用 getString(int)。

  3. UDF 输出是一个名为 myUDF(struct(col1,col2,col3,myKey)) 的结构。它有 50 多列,真的很长。我如何为它取别名?

感谢任何帮助!

最佳答案

TL;DR 使用 Dataset.map (并用 map 函数替换 UDF)。


How can I pass all the columns in the dataframe to the UDF without listing them one by one?

dataframe.schema.fieldNames

参见 Dataset API。

Is there a way I can access the row columns by name within the UDF?

引用 Row.fieldIndex 的 scaladoc :

fieldIndex(name: String): Int Returns the index of a given field name.

并使用索引。

It gets really long with 50+ columns. How can I alias this?

看起来您的代码会受益于一些重构和组合。在单个管道中处理 50 个字段可能会有点笨拙。

关于java - 如何在不指定每一列的情况下将整行作为参数传递给 Spark (Java) 中的 UDF?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44115422/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com