- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
我的问题是我的 pyspark 作业没有并行运行。
代码和数据格式:
我的 PySpark 看起来像这样(显然是经过简化的):
class TheThing:
def __init__(self, dInputData, lDataInstance):
# ...
def does_the_thing(self):
"""About 0.01 seconds calculation time per row"""
# ...
return lProcessedData
#contains input data pre-processed from other RDDs
#done like this because one RDD cannot work with others inside its transformation
#is about 20-40MB in size
#everything in here loads and processes from BigQuery in about 7 minutes
dInputData = {'dPreloadedData': dPreloadedData}
#rddData contains about 3M rows
#is about 200MB large in csv format
#rddCalculated is about the same size as rddData
rddCalculated = (
rddData
.map(
lambda l, dInputData=dInputData: TheThing(dInputData, l).does_the_thing()
)
)
llCalculated = rddCalculated.collect()
#save as csv, export to storage
在 Dataproc 集群上运行:
集群是通过 Dataproc UI 创建的.
作业是这样执行的:
gcloud --project <project> dataproc jobs submit pyspark --cluster <cluster_name> <script.py>
我通过 UI 观察了工作状态,started like this .浏览它时,我注意到我的工作节点中只有一个(看似随机的)在做任何事情。所有其他人都完全闲置。
PySpark 的重点是并行运行这个东西,显然不是这样。我已经在各种集群配置中运行了这些数据,最后一个是巨大的,这是我注意到它是单节点使用的时候。因此,为什么我的工作需要很长时间才能完成,而且时间似乎与集群大小无关。
所有使用较小数据集的测试在我的本地机器和集群上都毫无问题地通过了。我真的只需要高档。
编辑
我变了
llCalculated = rddCalculated.collect()
#... save to csv and export
到
rddCalculated.saveAsTextFile("gs://storage-bucket/results")
并且只有一个节点仍在执行工作。
最佳答案
根据您是从 GCS 还是 HDFS 加载 rddData
,默认分割大小可能是 64MB 或 128MB,这意味着您的 200MB 数据集只有 2-4 个分区。 Spark 之所以这样做,是因为典型的基本数据并行任务处理数据的速度足够快,64MB-128MB 意味着可能需要数十秒的处理时间,因此拆分成较小的并行 block 没有任何好处,因为启动开销将占主导地位。
在您的情况下,听起来每 MB 的处理时间要长得多,因为您加入了另一个数据集,并且可能对每条记录执行了相当重量级的计算。所以你需要更多的分区,否则无论你有多少节点,Spark 都不知道要拆分成超过 2-4 个工作单元(如果每台机器都可能打包到一台机器上)有多个核心)。
所以你只需要调用repartition
:
rddCalculated = (
rddData
.repartition(200)
.map(
lambda l, dInputData=dInputData: TheThing(dInputData, l).does_the_thing()
)
)
或者将重新分区添加到较早的行:
rddData = rddData.repartition(200)
或者如果你在读取时重新分区,你可能会有更好的效率:
rddData = sc.textFile("gs://storage-bucket/your-input-data", minPartitions=200)
关于python-2.7 - Dataproc Pyspark 作业仅在一个节点上运行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37589472/
我正在使用 JavaFX 8 创建一个应用程序。我使用拖/放动态更改网格 Pane 的内容。我希望每行或每行/列迭代 GridPane 内容。JavaFX 允许通过指定行和列在 GridPane 中添
我正在尝试将图像拖放到div上。图像没有被拖到div上并给出以下错误 Uncaught TypeError: Failed to execute 'appendChild' on 'Node': pa
我正在 android studio 中创建内部构建 AR 导航。我正在寻找一种方法将 anchor 与其他 anchor 或 anchor 节点/节点“连接”起来。我不确定使用哪一个。基于我将强制用
我在 Hive 上运行一些作业:首先是 4 节点,然后是 2 节点。令我惊讶的是,我的 2 节点性能比我的 4 节点更好。 首先,我在一个 4 节点(4 个事件节点)上运行查询,然后关闭 2 个节点(
我有 Node* current ,我在其中存储指向列表“顶部”当前节点的指针。当我将一个新节点设置为当前节点时,出现错误: '=' : cannot convert from 'CircularDo
我是 dcos Mesos 的新手,在本地 Ubuntu 机器上安装了 dc os。 我可以查看 dcos 仪表板。 但我无法使用 dcos node ssh --master-proxy --lea
在 JavaFX 中,是否有类似 setLayout(); 的东西?或 setBounds(); ? 例如,我想将按钮定位到我想要的位置。 最佳答案 JavaFX 场景图上的所有内容都是 Node .
我正在开发一个 JavaFX 应用程序,其中我开发的类(从 javafx.scene.Parent 扩展)是根据用户在 ListView 控件中单击的条目动态创建的。 只是要清楚这个节点,它不是使用像
我正在尝试为节点-边缘关系创建一个类图,因为它可以在有向图中找到。我想传达的是,Nodes 引用了 Edges,Edges 也引用了 Nodes。每个 Edge 都恰好需要两个 Node(源和目标)。
在mapreduce作业期间,单个任务将在随机节点上运行,是否有任何方法限制应在其中运行任务的节点? 最佳答案 Hadoop不会选择节点来随机运行任务。考虑到数据局部性,否则将有很多网络开销。 任务与
有什么区别: a) nodetool 重建 b) nodetool 修复 [-pr] 换句话来说,各个命令到底是做什么的? 最佳答案 nodetool重建:类似于引导过程(当您向集群添加新节点时),但
我已将第一个 OneToMany 关系添加到我的 hibernate 3.6.10 项目中。这是一个类: /** * */ package com.heavyweightsoftware.leal
是否有可能找到正在监听触发当前函数的事件的元素? 在下面的代码中,event.target 返回 #xScrollPane 和 event.currentTarget 和 event 的最低子节点.f
我正在尝试覆盖我数据库中的一些数据。结构很简单,就是: recipes { user_1{ recipe_1{data} recipe_2{data} } user_2{
我使用 setInterval 来运行该函数,但它会多次执行函数 2... 如何在输入中插入一个值后执行函数 第一个输入与其余输入的距离不同 如何在插入 val(tab 选项)后将插入从 1 个输入移
我不知道代码有什么问题,但在 visual studio 中不断收到这些错误消息。 Error 18 error C1903: unable to recover from previous e
我正在尝试从其类中获取 SharePoint 搜索导航节点的对象。 var nodes = $("div.ms-qSuggest-listItem"); 我正在获取节点对象,现在想要获取“_promp
D:\nodeP>node main.js module.js:327 抛出错误; ^ 错误:在 Function.Module 的 Function.Module._resolveFilename
struct node{ int key, prior, cnt, val; node *l, *r; node(){} node(int nkey) : key(nkey),
我有以下代码使用迭代器将项目插入双链表。这就是我们被要求这样做的方式。代码有效,但问题是我有 24 字节的绝对内存泄漏。 NodeIterator insert(NodeIterator & itrP
我是一名优秀的程序员,十分优秀!