gpt4 book ai didi

java - 为什么(在 "cluster"模式下)我的 UDF 在本地(在驱动程序中)而不是在工作线程上执行

转载 作者:行者123 更新时间:2023-12-02 08:46:16 26 4
gpt4 key购买 nike

两个sparkworker正在运行,代码如下(JUnit:

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.testng.annotations.Test;

public class UdfTest {

@Test
public void simpleUdf() {
SparkConf conf = new SparkConf()
.set("spark.driver.host", "localhost")
.setMaster("spark://host1:7077")
.set("spark.jars", "/home/.../myjar.jar")
.set("spark.submit.deployMode", "cluster")
.setAppName("RESTWS ML");

SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();

List<Row> rows = new ArrayList<>();
for (long i = 0; i < 10; i++) {
rows.add(RowFactory.create("cr" + i));
}

Dataset<Row> textAsDataset = sparkSession.createDataFrame(rows,
new StructType(new StructField[] { new StructField("contentRepositoryUUID", DataTypes.StringType, false, Metadata.empty()) }));

sparkSession.udf().register("myUdf",
(UDF1<String, String>)(col1) -> myUdf(col1), DataTypes.StringType);

Dataset<Row> rowDataset = textAsDataset.withColumn("text", functions.callUDF("myUdf",
textAsDataset.col("contentRepositoryUUID")
));
rowDataset.show();
}

private String myUdf(String col1) {
new Exception().printStackTrace();
return col1 + " changed";
}
}

数据集已创建,我希望从工作 java 进程调用 java 函数 myUdf(),但它是从驱动程序线程调用的,堆栈跟踪源自 rowDataset.show() 行:

java.lang.Exception
at UdfTest.myUdf(UdfTest.java:53)
at UdfTest.lambda$simpleUdf$45ca9450$1(UdfTest.java:44)
at org.apache.spark.sql.UDFRegistration$$anonfun$259.apply(UDFRegistration.scala:759)
at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:108)
at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:107)
at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1063)
at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:152)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:92)
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$24$$anonfun$applyOrElse$23.apply(Optimizer.scala:1364)
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$24$$anonfun$applyOrElse$23.apply(Optimizer.scala:1364)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$24.applyOrElse(Optimizer.scala:1364)
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$24.applyOrElse(Optimizer.scala:1359)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:259)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:259)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:258)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformDown(AnalysisHelper.scala:149)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:329)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:264)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformDown(AnalysisHelper.scala:149)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:329)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:264)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformDown(AnalysisHelper.scala:149)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:329)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:264)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformDown(AnalysisHelper.scala:149)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:248)
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$.apply(Optimizer.scala:1359)
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$.apply(Optimizer.scala:1358)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:87)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:84)
at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:35)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:84)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3365)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
at org.apache.spark.sql.Dataset.show(Dataset.scala:751)
at org.apache.spark.sql.Dataset.show(Dataset.scala:710)
at org.apache.spark.sql.Dataset.show(Dataset.scala:719)
at UdfTest.simpleUdf(UdfTest.java:49)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
at org.testng.internal.Invoker.invokeMethod(Invoker.java:571)
at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:707)
at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:979)
at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125)
at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109)
at org.testng.TestRunner.privateRun(TestRunner.java:648)
at org.testng.TestRunner.run(TestRunner.java:505)
at org.testng.SuiteRunner.runTest(SuiteRunner.java:455)
at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:450)
at org.testng.SuiteRunner.privateRun(SuiteRunner.java:415)
at org.testng.SuiteRunner.run(SuiteRunner.java:364)
at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:84)
at org.testng.TestNG.runSuitesSequentially(TestNG.java:1187)
at org.testng.TestNG.runSuitesLocally(TestNG.java:1116)
at org.testng.TestNG.runSuites(TestNG.java:1028)
at org.testng.TestNG.run(TestNG.java:996)
at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:66)
at com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:110)

Spark 如何决定是否可以从 Worker 调用 UDF?

奇怪的是,它已经工作过一次,但现在当我尝试重现这个“分布式 UDF”场景时,某些东西发生了变化,所以我不能。不幸的是,查看 Spark DEBUG 日志对我没有帮助。

最佳答案

虽然堆栈跟踪确实源自 show() 调用,但关键实际上是......

...
HERE --> at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
...
at org.apache.spark.sql.Dataset.show(Dataset.scala:719)
...

您仍处于查询优化阶段,该阶段由驱动程序中的 Catalyst 完成。

原因是 Spark 的一个特性记录很少,即使用 SparkSession.createDataFrame() 从本地集合创建数据集 (SparkSession.createDatset()/Seq.toDF()(Scala 中的 code>Seq.toDF())仅仅是驱动程序内部的局部关系,并不是真正的分布式:

scala> val df = (0 to 5).toDF
df: org.apache.spark.sql.DataFrame = [value: int]

scala> df.queryExecution.analyzed
res45: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
LocalRelation [value#107]

scala> df.isLocal
res46: Boolean = true

与从 RDD 创建的数据集不同:

scala> val df_from_rdd = sc.parallelize(0 to 5).toDF
df_from_rdd: org.apache.spark.sql.DataFrame = [value: int]

scala> df_from_rdd.queryExecution.analyzed
res47: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
SerializeFromObject [input[0, int, false] AS value#112]
+- ExternalRDD [obj#111]

scala> df_from_rdd.isLocal
res48: Boolean = false

诸如Dataset.withColumn()之类的操作实际上由驱动程序本身执行,作为优化查询计划的延迟评估的一部分,并且永远不会进入执行阶段:

scala> val df_foo = df.withColumn("foo", functions.callUDF("myUdf", $"value"))
df_foo: org.apache.spark.sql.DataFrame = [value: int, foo: string]

scala> df_foo.queryExecution.analyzed
res49: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [value#107, UDF:myUdf(cast(value#107 as string)) AS foo#146]
+- LocalRelation [value#107]

scala> df_foo.queryExecution.optimizedPlan
java.lang.Exception
at $line98.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.myUdf(<console>:25)
at $line99.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:26)
at $line99.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:26)
...
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$.apply(Optimizer.scala:1358)
...
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
at $line143.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:27)
...
res50: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
LocalRelation [value#107, foo#132]
// Notice: the projection is gone, merged into the local relation

scala> df_foo.queryExecution.optimizedPlan
res51: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
LocalRelation [value#107, foo#163]
// Notice: no stack trace this time

与处理从 RDD 创建的数据集不同:

scala> val df_from_rdd_foo = df_from_rdd.withColumn("foo", functions.callUDF("myUdf", $"value"))
df_from_rdd_foo: org.apache.spark.sql.DataFrame = [value: int, foo: string]

scala> df_from_rdd_foo.queryExecution.optimizedPlan
res52: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [value#112, UDF:myUdf(cast(value#112 as string)) AS foo#135]
+- SerializeFromObject [input[0, int, false] AS value#112]
+- ExternalRDD [obj#111]

它不会在执行程序的 stderr 中产生堆栈跟踪,即不会调用 UDF。另一方面:

scala> df_from_rdd_foo.show()
+-----+---------+
|value| foo|
+-----+---------+
| 0|0 changed|
| 1|1 changed|
| 2|2 changed|
| 3|3 changed|
| 4|4 changed|
| 5|5 changed|
+-----+---------+

在执行器的 stderr 中生成以下堆栈跟踪:

java.lang.Exception
at $line98.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.myUdf(<console>:25)
at $line99.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:26)
at $line99.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:26)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
...
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:409)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Spark 将局部关系视为文字,这也可以从它们在 SQL 中的表示方式看出(代码改编自 here ):

scala> df.queryExecution.analyzed.collect { case r: LocalRelation => r }.head.toSQL("bar")
res55: String = VALUES (0), (1), (2), (3), (4), (5) AS bar(value)

scala> df_foo.queryExecution.optimizedPlan.collect { case r: LocalRelation => r }.head.toSQL("bar")
res56: String = VALUES (0, '0 changed'), (1, '1 changed'), (2, '2 changed'), (3, '3 changed'), (4, '4 changed'), (5, '5 changed') AS bar(value, foo)

或者作为代码:

scala> df.queryExecution.analyzed.asCode
res57: String = LocalRelation(
List(value#107),
Vector([0,0], [0,1], [0,2], [0,3], [0,4], [0,5]),
false
)

scala> df_foo.queryExecution.analyzed.asCode
res58: String = Project(
List(value#107, UDF:myUdf(cast(value#107 as string)) AS foo#163),
LocalRelation(
List(value#107),
Vector([0,0], [0,1], [0,2], [0,3], [0,4], [0,5]),
false
)
)

scala> df_foo.queryExecution.optimizedPlan.asCode
res59: String = LocalRelation(
List(value#107, foo#163),
Vector([0,0 changed], [1,1 changed], [2,2 changed], [3,3 changed], [4,4 changed], [5,5 changed]),
false
)

将发生的情况想象为 Java 编译器将 int a = 2 * 3; 替换为 int a = 6; 等代码,实际计算为由编译器完成。

关于java - 为什么(在 "cluster"模式下)我的 UDF 在本地(在驱动程序中)而不是在工作线程上执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61064044/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com