gpt4 book ai didi

java - 如何在 Java 中创建接受字符串数组的 Spark UDF?

转载 作者:行者123 更新时间:2023-11-29 04:05:33 25 4
gpt4 key购买 nike

这个问题有人问过here对于 Scala,它对我没有帮助,因为我正在使用 Java API。我真的把所有东西和厨房的水槽都扔了,所以这是我的方法:

List<String> sourceClasses = new ArrayList<String>();
//Add elements
List<String> targetClasses = new ArrayList<String>();
//Add elements

dataset = dataset.withColumn("Transformer", callUDF(
"Transformer",
lit((String[])sourceClasses.toArray())
.cast(DataTypes.createArrayType(DataTypes.StringType)),
lit((String[])targetClasses.toArray())
.cast(DataTypes.createArrayType(DataTypes.StringType))
));

对于我的 UDF 声明:

public class Transformer implements UDF2<Seq<String>, Seq<String>, String> {


// @SuppressWarnings("deprecation")
public String call(Seq<String> sourceClasses, Seq<String> targetClasses)
throws Exception {

当我运行代码时,执行不会继续通过 UDF 调用,这是预期的,因为我无法匹配类型。请在这方面帮助我。

编辑

我尝试了@Oli 建议的解决方案。但是,我得到以下异常:

org.apache.spark.SparkException: Failed to execute user defined function($anonfun$261: (array<string>, array<string>) => string)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to scala.collection.immutable.Seq
at com.esrx.dqm.uuid.UUIDTransformerEngine$1.call(UUIDTransformerEngine.java:1)
at org.apache.spark.sql.UDFRegistration$$anonfun$261.apply(UDFRegistration.scala:774)
... 22 more

这一行似乎特别表明了一个问题:

Caused by: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to scala.collection.immutable.Seq

最佳答案

根据我对您的 UDF 类型的了解,您正在尝试创建一个 UDF,它将两个数组作为输入并返回一个字符串。

在 Java 中,这有点痛苦但易于管理。

假设您要连接两个数组并用单词 AND 将它们链接起来。您可以按如下方式定义 UDF:

UDF2 my_udf2 = new UDF2<WrappedArray<String>, WrappedArray<String>, String>() {
public String call(WrappedArray<String> a1, WrappedArray a2) throws Exception {
ArrayList<String> l1 = new ArrayList(JavaConverters
.asJavaCollectionConverter(a1)
.asJavaCollection());
ArrayList<String> l2 = new ArrayList(JavaConverters
.asJavaCollectionConverter(a2)
.asJavaCollection());
return l1.stream().collect(Collectors.joining(",")) +
" AND " +
l2.stream().collect(Collectors.joining(","));
}
};

请注意,您需要在方法的签名中使用 scala WrappedArray,并在方法主体中使用 JavaConverters 转换它们,以便能够在 Java 中操作它们.为了以防万一,这里是必需的导入。

import scala.collection.mutable.WrappedArray;
import scala.collection.JavaConverters;

然后您可以注册 UDF 以将其与 Spark 一起使用。为了能够使用它,我从“id”列创建了一个示例数据框和两个虚拟数组。请注意,它也可以与您在问题中尝试做的 lit 功能一起使用。

spark.udf().register("my_udf2", my_udf2, DataTypes.StringType);

String[] data = {"abcd", "efgh", "ijkl"};

spark.range(3)
.withColumn("id", col("id").cast("string"))
.withColumn("array", functions.array(col("id"), col("id")))
.withColumn("string_of_arrays",
functions.callUDF("my_udf2", col("array"), lit(data)))
.show(false);

产生:

+---+------+----------------------+
|id |array |string_of_arrays |
+---+------+----------------------+
|0 |[0, 0]|0,0 AND abcd,efgh,ijkl|
|1 |[1, 1]|1,1 AND abcd,efgh,ijkl|
|2 |[2, 2]|2,2 AND abcd,efgh,ijkl|
+---+------+----------------------+

在 Spark >= 2.3 中,你也可以这样做:

UserDefinedFunction my_udf2 = udf(
(WrappedArray<String> s1, WrappedArray<String> s2) -> "some_string",
DataTypes.StringType
);

df.select(my_udf2.apply(col("a1"), col("a2")).show(false);

关于java - 如何在 Java 中创建接受字符串数组的 Spark UDF?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59026439/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com