gpt4 book ai didi

scala - 如何将任何新库(如 spark-sftp)添加到我的 Pyspark 代码中?

转载 作者:行者123 更新时间:2023-12-05 08:08:09 24 4
gpt4 key购买 nike

当我尝试在我的 Spark conf 中设置包依赖项“spark-sftp”时,我得到 ClassNotFoundException。但是当我使用以下命令执行脚本时它有效:

spark-submit --packages com.springml:spark-sftp_2.11:1.1.1 test.py

下面是我的代码。有人能告诉我如何执行我的 pyspark 脚本吗 不将包作为参数传递给 spark-submit?

import sys
import datetime
import pyspark
from pyspark.sql import *
from pyspark.sql import SparkSession, SQLContext, Row, HiveContext
from pyspark import SparkContext

#Create new config
conf = (pyspark.conf.SparkConf()
.set("spark.driver.maxResultSize", "16g")
.set("spark.driver.memory", "20g")
.set("spark.executor.memory", "20g")
.set("spark.executor.cores", "5")
.set("spark.shuffle.service.enabled", "true")
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.initialExecutors", "24")
.set("spark.dynamicAllocation.minExecutors", "6")
.set("spark.submit.deployMode", "client")
.set("spark.jars.packages", "com.springml:spark-sftp_2.11:1.1.1")
.set("spark.python.worker.memory", "4g")
.set("spark.default.parallelism", "960")
.set("spark.executor.memoryOverhead", "4g")
.setMaster("yarn-client"))


# Create new context
spark = SparkSession.builder.appName("AppName").config(conf=conf).enableHiveSupport().getOrCreate()
spark.sparkContext.setLogLevel("WARN")


df = spark.read.format("com.springml.spark.sftp").option("host", "HOST").option("username", "HOSTNAME").option("password", "pass").option("fileType", "csv").option("inferSchema", "true").load("/test/sample.csv")

输出::java.lang.ClassNotFoundException:找不到数据源:com.springml.spark.sftp。请在 http://spark.apache.org/third-party-projects.html 找到包裹 在 org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:635) 在 org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:190) 在 org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:174) 在 sun.reflect.NativeMethodAccessorImpl.invoke0( native 方法) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498) 在 py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 在 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 在 py4j.Gateway.invoke(Gateway.java:282) 在 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 在 py4j.commands.CallCommand.execute(CallCommand.java:79) 在 py4j.GatewayConnection.run(GatewayConnection.java:238) 在 java.lang.Thread.run(Thread.java:748)引起:java.lang.ClassNotFoundException:com.springml.spark.sftp.DefaultSource

最佳答案

提交 spark 作业时,您可以指定要安装的包。对于这个,您可以将此 Maven 依赖项指定为:

> $SPARK_HOME/bin/spark-shell --packages com.springml:spark-sftp_2.11:1.1.3

关于scala - 如何将任何新库(如 spark-sftp)添加到我的 Pyspark 代码中?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52067494/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com