gpt4 book ai didi

java - 如何通过 Spark Listener 检查 Databricks 上是否安装了类?

转载 作者:行者123 更新时间:2023-12-02 06:10:33 26 4
gpt4 key购买 nike

在 Databricks 上使用 Spark 监听器,我试图查看是否安装了给定的类,但由于 Databricks 安装包的方式,监听器无法在集群启动后看到安装的包。

在 Java Spark 监听器中,是否有更好的方法来识别通过 Databricks 的库 API/UI 安装的包安装了类?

摘要

  • 使用通过 cluster-scoped init script 安装的 SparkListener在 Databricks 上。
  • 使用 ClassLoader在监听器中检查是否安装了给定的类。
  • 在 Apache Spark 上
    • 如果监听器是通过 --packages--jars 安装的,则适用于 Apache Spark。
    • 如果监听器是通过 --conf spark.driver.extraClassPath 安装的,并且所需的库是通过 --packages- 安装的,则在 Apache Spark 上失败-jars.
  • 在 (Azure) Databricks 上
    • 如果库已存在于 /datbaricks/jars 目录(即 $CLASSPATH 目录)中,则适用于 Databricks。
    • 如果通过 Libraries API/UI 安装库,则在 Databricks 上失败(以这种方式安装的 jar 似乎最终位于 /local_disk0/tmp)。

Spark 监听器详细信息

使用 Apache Spark,我可以通过 --packages + --conf spark.extraListeners=listener.MyListener 安装 Spark Listener 并利用 ClassLoader在 Spark Listener 中检查是否有通过 --jars--packages 或类路径安装的任何类。检测类是否存在的监听器如下所示。

public class MyListener extends org.apache.spark.scheduler.SparkListener {

private static final Logger log = LoggerFactory.getLogger("MyLogger");
@Override
public void onJobStart(SparkListenerJobStart jobStart) {
try{
log.info("Trying LogicalRelation");
MyListener.class.getClassLoader().loadClass(
"org.apache.spark.sql.execution.datasources.LogicalRelation"
);
log.info("Got logical relation");
}
catch (ClassNotFoundException e){
log.info("Couldn't find LogicalRelation");
}

try{
log.info("Trying org.apache.iceberg.catalog.Catalog");
MyListener.class.getClassLoader().loadClass("org.apache.iceberg.catalog.Catalog");
log.info("Got org.apache.iceberg.catalog.Catalog!!!!");
} catch(ClassNotFoundException e){
log.info("Could not get org.apache.iceberg.catalog.Catalog");
}

try{
log.info("Trying Kusto DefaultSource");
MyListener.class.getClassLoader().loadClass("com.microsoft.kusto.spark.datasource.DefaultSource");
log.info("Got Kusto DefaultSource!!!!");
} catch(ClassNotFoundException e){
log.info("Could not get Kusto DefaultSource");
}
}
}

在 Databricks 上,监听器是通过初始化脚本安装的,如下所示:

cp -f /dbfs/databricks/custom/listener.jar /mnt/driver-daemon/jars || { echo "Error"; exit 1;}

cat << 'EOF' > /databricks/driver/conf/customer-listener.conf
[driver] {
"spark.extraListeners" = "listener.MyListener"
}
EOF

此安装方法与其他公共(public)监听器类似:

尝试使用 URLClassLoader

看来 Scala ClassLoader doesn't play nicely with a Java classloader 。我尝试添加 URLClassLoader根据另一个SO post on setting a different classloader但 ClassNotFoundException 仍在继续。

但是,Databricks Interactive 笔记本上的这段代码确实成功找到了我的测试类

URLClassLoader ucl;
try {
log.info("URL Class Loader Attempt V3");
File file = new File("/local_disk0/tmp/");
URL classUrl = file.toURI().toURL();
URL[] urls = new URL[] { classUrl };
System.out.println(urls.toString());
ucl = new URLClassLoader(urls, getClass().getClassLoader());
ucl.loadClass("com.microsoft.kusto.spark.datasource.DefaultSource");
try {
ucl.close();
} catch (IOException e) {
log.error("Failed to close url classloader");
}
log.info("GOT KustoLIBRARY with URL Class Loader!");
} catch (ClassNotFoundException e) {
// Still hitting this one
log.info("Could not get Kusto Library with URLClassLoader");
} catch (MalformedURLException e) {
log.info("The URL was malformed");
}

Databricks 库详细信息

对于 Databricks,大多数用户使用 Libraries在 Spark 启动后安装 jar 的功能,并允许用户通过 Databricks UI 或 API 轻松安装 jar。

当使用上述监听器和 ClassLoader 时,它将始终为通过库 API 安装的包引发 ClassNotFoundException

在 Databricks 日志中,我可以看到所需的 jar 正在安装在日志中。

22/07/14 13:32:34 INFO DriverCorral: [Thread 123] AttachLibraries - candidate libraries: List(JavaJarId(dbfs:/FileStore/jars/maven/com/microsoft/azure/kusto/kusto-spark_3.0_2.12-3.0.0.jar,,NONE)
22/07/14 13:32:34 INFO DriverCorral: [Thread 123] AttachLibraries - new libraries to install (including resolved dependencies): List(JavaJarId(dbfs:/FileStore/jars/maven/com/microsoft/azure/kusto/kusto-spark_3.0_2.12-3.0.0.jar,,NONE))
22/07/14 13:32:37 INFO SharedDriverContext: [Thread 123] attachLibrariesToSpark JavaJarId(dbfs:/FileStore/jars/maven/com/microsoft/azure/kusto/kusto-spark_3.0_2.12-3.0.0.jar,,NONE)
22/07/14 13:32:37 INFO LibraryDownloadManager: Downloading a library that was not in the cache: JavaJarId(dbfs:/FileStore/jars/maven/com/microsoft/azure/kusto/kusto-spark_3.0_2.12-3.0.0.jar,,NONE)
22/07/14 13:32:37 INFO LibraryDownloadManager: Attempt 1: wait until library JavaJarId(dbfs:/FileStore/jars/maven/com/microsoft/azure/kusto/kusto-spark_3.0_2.12-3.0.0.jar,,NONE) is downloaded
22/07/14 13:32:37 INFO LibraryDownloadManager: Downloaded library JavaJarId(dbfs:/FileStore/jars/maven/com/microsoft/azure/kusto/kusto-spark_3.0_2.12-3.0.0.jar,,NONE) as local file /local_disk0/tmp/addedFile2043314239110388521kusto_spark_3_0_2_12_3_0_0-6add9.jar in 39 milliseconds
22/07/14 13:32:37 INFO SharedDriverContext: Successfully saved library JavaJarId(dbfs:/FileStore/jars/maven/com/microsoft/azure/kusto/kusto-spark_3.0_2.12-3.0.0.jar,,NONE) to local file /local_disk0/tmp/addedFile2043314239110388521kusto_spark_3_0_2_12_3_0_0-6add9.jar
22/07/14 13:32:37 INFO SharedDriverContext: Successfully attached library dbfs:/FileStore/jars/maven/com/microsoft/azure/kusto/kusto-spark_3.0_2.12-3.0.0.jar to Spark
22/07/14 13:32:37 INFO LibraryState: [Thread 123] Successfully attached library dbfs:/FileStore/jars/maven/com/microsoft/azure/kusto/kusto-spark_3.0_2.12-3.0.0.jar

如果我要将所需的 jar/包及其所有依赖项安装到 /databricks/jars 文件夹中,Spark 监听器可以成功检测到这些包已安装。 Confirmed by Databricks Employee on SO 。但是,考虑到 Databricks 库功能,这并不是常见做法。

因此,这一切似乎都归结为:如何让 Databricks 交互式或作业集群上的主类加载器识别通过 Spark 应用程序上下文安装的库(如库 API/UI 中所示)?

感谢您的见解!

最佳答案

使用Thread.currentThread().getContextClassLoader().loadClass("<class_name>")而不是MyListener.class.getClassLoader().loadClass("<class_name>")在这种情况下似乎可以按要求工作。

Apache Spark implementation还使用Thread.currentThread().getContextClassLoader .

以下 Stack Overflow 帖子有助于理解这两种方法之间的差异:

这个article似乎还提供了有关 Java 中不同类型的类加载器的更多信息。

希望这有帮助!

关于java - 如何通过 Spark Listener 检查 Databricks 上是否安装了类?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72985892/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com