- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
因此,我查看了Pyspark,Jupyter上的大量帖子,并设置了内存/核心/执行程序(以及相关的内存)。
但我似乎被困住了-
问题1:我看不到我的机器使用内核或内存。 为什么?我可以对练习者/核心/内存进行一些调整以优化读取文件的速度吗?
问题2:还有什么办法可以让我看到进度条,其中显示了导入了多少文件(spark-monitor似乎没有这样做)。
我正在将33.5gb文件导入pyspark。
机器有112 GB或RAM
8核心/16虚拟核心。
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Summaries") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '4g'),
('spark.app.name', 'Spark Updated Conf'),
('spark.driver.cores', '4'), ('spark.executor.cores', '16'),
('spark.driver.memory','90g')])
spark.sparkContext.stop()
spark = SparkSession.builder.config(conf=conf).getOrCreate()
df = spark.read.json("../Data/inasnelylargefile.json.gz")
我假设pyspark即使在读取文件时也正在发挥其魔力(因此我应该看到大量的内核/内存利用率)。但是我没有看到它。帮助!
start = time.time()
df = spark.read.json("../Data/small.json.gz")
end = time.time()
print(end - start)
start = time.time()
df = pa.read_json('../Data/small.json.gz',compression='gzip', lines = True)
end = time.time()
print(end - start)
最佳答案
尽管您问题的答案仅在于以下问题之一,但让我重写您的示例以解释正在发生的事情。
设定设定
首先,您不需要启动和停止上下文来设置配置。从spark 2.0开始,您可以创建spark session ,然后设置config选项。
from pyspark.sql import SparkSession
spark = (SparkSession.builder.appName("yourAwesomeApp").getOrCreate())
spark.conf.set("spark.executor.memory", "40g")
spark.conf.set("spark.executor.cores", "2")
读取数据
df = spark.read.json("../data/a_very_large_json.json.gz")
和
from pyspark.sql.types import (
StructType,
StringType,
StructField,
)
json_schema = schema = StructType([
StructField('data', StructType([
StructField("field1", StringType(), nullable=False),
StructField("field2", StringType(), nullable=False),
StructField("field3", StringType(), nullable=True),
StructField("field4", StringType(), nullable=True),
StructField("field5", LongType(), nullable=False),
])),
])
df = spark.read.json("../data/a_very_large_json.json.gz", schema=json_schema)
如果提供了该模式,则该指令应几乎立即生效。
df.show()
您可以在配置上设置执行程序实例和核心的数量,但是这些实例的实际使用还取决于您的输入数据和执行的转换/操作。根据您的描述,我假设您正在独立模式下工作,因此默认具有一个执行程序实例(使用所有内核),并且应将执行程序内存设置为使用可用的实例。据我所记得,当您在独立模式下工作时,
spark.executor.instances
会被忽略,而实际的执行程序数取决于可用的内核数和
spark.executor.cores
与 Pandas 比较
关于python - PySpark : Setting Executors/Cores and Memory Local Machine,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63387253/
Issue 2019/05/09 21:50:07.380 +0800 ERROR [ExecutorManager] [Azkaban] No active executors found
我的问题是:使用 Executors.newFixedThreadPool(1)?? 有意义吗? 。在两个线程(main + oneAnotherThread)场景下使用执行器服务是否高效?正在通过调
我想知道,Executors.newSingleThreadExecutor() 之间有什么区别?和 Executors.newFixedThreadPool(1) 以下摘自javadoc Unlik
我的问题是:使用 Executors.newFixedThreadPool(1) 有意义吗??。在两个线程(main + oneAnotherThread)场景中使用执行器服务是否有效?通过调用 ne
我有一个 Apache Spark 应用程序在集群模式下运行在 YARN 集群上(spark 在这个集群上有 3 个节点)。 当应用程序运行时,Spark-UI 显示 2 个执行程序(每个运行在不同的
我想知道是否有任何理由使用 Executor 而不是 ExecutorService。 据我所知,JDK 中没有实现 Executor 接口(interface),它也不是 ExecutorServi
我有多个使用 Celery Executor 的 dag,但我希望使用 Kubernetes Executor 运行一个特定的 dag。我无法推断出一种良好而可靠的方法来实现这一目标。 我有一个 ai
假设我们的 Controller 中有一个 Action 。在每次请求时,许多用户都会调用 performLogin。 def performLogin( ) = { Async {
创建和管理您自己的 ExecutorService 与使用 Spring Boot 的 @Async 方法和 @Bean 方法创建 Executor 添加一个@Bean来创建一个Executor 手动
问题从无到有,只有我在代码中所做的更改 - 安装了 RaSharper(但删除它并重新安装 Visual Studio 没有帮助)。 所以我使用 NUnit 3 来运行测试。 我有 Visual St
我们知道每个任务当时都在一个核心中执行。 假设我们有这样配置的节点集群: 10 节点。 每个节点 16 个核心。 每个节点 64 GB 内存。 我的问题是 有 1 个 16 核的执行程序和 16 个
我正在从 Jupyter Notebook 中初始化 PySpark,如下所示: from pyspark import SparkContext # conf = SparkConf().setAp
我正在向我的 Web 应用程序添加一个基于 Flask 的 API,以控制某些网络自动化功能的启动和停止。我遇到了一个奇怪的行为,即 Flask-Executor .submit() 方法调用的函数似
单元测试在本地运行良好。 在 Visual Studio 2017 托管生成代理上运行时,VSTest 任务失败并显示: 2018-12-08T10:42:16.3779907Z An excepti
我正在尝试制作一个执行器和线程的简单示例。 当我调用 newSingleThreadExecutor(new CustomThreadFactory) 时,一切顺利,但是当我使用 null 参数调用
对于一个线程,我通过以下代码段捕获未捕获的异常。但是,对于 ExecutorService executor = Executors.newFixedThreadPool(10);,如何捕获未捕获的异
我想创建一个 CompletableFuture,其返回值在 Kotlin 中的特定执行程序上运行。 下面的代码工作得很好。 return CompletableFuture.supplyAsync
考虑基本的固定线程池: Executors.newFixedThreadPool(MaxListeners) 我打算不断提交新任务 - 响应传入的 TCP 套接字服务请求。 然而,当每个任务中的Run
我们可以在定义 ThreadPoolExecutors 时提供 BlockingQueue 实现。但是,如果我使用工厂(执行器)创建单个线程池(如下所示),我想知道使用哪个阻塞队列。我猜它是一个 Li
我编写了一个程序来执行两个在 shell 前台运行的命令,直到在终端上按下 ^c。 外壳命令 ./weed master -mdir=/var/lib/qualebs/weed 上面命令的输出是 qu
我是一名优秀的程序员,十分优秀!