- mongodb - 在 MongoDB mapreduce 中,如何展平值对象?
- javascript - 对象传播与 Object.assign
- html - 输入类型 ="submit"Vs 按钮标签它们可以互换吗?
- sql - 使用 MongoDB 而不是 MS SQL Server 的优缺点
我正在使用 YARN 在集群中运行 Spark Streaming 任务。集群中的每个节点都运行多个 spark worker。在流式传输开始之前,我想对集群中所有节点上的所有工作人员执行“设置”功能。
流式传输任务将传入消息分类为垃圾邮件或非垃圾邮件,但在此之前,它需要将最新的预训练模型从 HDFS 下载到本地磁盘,如以下伪代码示例:
def fetch_models():
if hadoop.version > local.version:
hadoop.download()
我在 SO 上看到了以下示例:
sc.parallelize().map(fetch_models)
但在 Spark 1.6 parallelize()
中需要使用一些数据,就像我现在正在做的这种糟糕的解决方法:
sc.parallelize(range(1, 1000)).map(fetch_models)
为了确保该函数在所有工作人员上运行,我将范围设置为 1000。我也不确切知道运行时集群中有多少工作人员。
我已经阅读了编程文档并无情地搜索了谷歌,但我似乎无法找到任何方法来实际将任何东西分发给所有工作人员而无需任何数据。
在这个初始化阶段完成后,流式任务照常处理来自 Kafka 的传入数据。
我使用模型的方式是运行一个类似的函数:
spark_partitions = config.get(ConfigKeys.SPARK_PARTITIONS)
stream.union(*create_kafka_streams())\
.repartition(spark_partitions)\
.foreachRDD(lambda rdd: rdd.foreachPartition(lambda partition: spam.on_partition(config, partition)))
理论上我可以在 on_partition
函数中检查模型是否是最新的,尽管在每个批处理上都这样做真的很浪费。我想在 Spark 开始从 Kafka 检索批处理之前执行此操作,因为从 HDFS 下载可能需要几分钟...
更新:
需要明确的是:这不是如何分发文件或如何加载文件的问题,而是如何在不操作任何数据的情况下对所有工作人员运行任意方法。
澄清当前实际加载模型的含义:
def on_partition(config, partition):
if not MyClassifier.is_loaded():
MyClassifier.load_models(config)
handle_partition(config, partition)
而 MyClassifier 是这样的:
class MyClassifier:
clf = None
@staticmethod
def is_loaded():
return MyClassifier.clf is not None
@staticmethod
def load_models(config):
MyClassifier.clf = load_from_file(config)
静态方法,因为 PySpark 似乎无法使用非静态方法序列化类(类的状态与其他工作人员无关)。在这里,我们只需调用一次 load_models(),并且在所有 future 的批处理中都将设置 MyClassifier.clf。这是真的不应该为每批做的事情,这是一次性的事情。与使用 fetch_models() 从 HDFS 下载文件相同。
最佳答案
如果您只想在工作机器之间分发文件,最简单的方法是使用 SparkFiles
机制:
some_path = ... # local file, a file in DFS, an HTTP, HTTPS or FTP URI.
sc.addFile(some_path)
并使用 SparkFiles.get
和标准 IO 工具在工作人员上检索它:
from pyspark import SparkFiles
with open(SparkFiles.get(some_path)) as fw:
... # Do something
如果您想确保实际加载模型,最简单的方法是在模块导入时加载。假设 config
可用于检索模型路径:
model.py
:
from pyspark import SparkFiles
config = ...
class MyClassifier:
clf = None
@staticmethod
def is_loaded():
return MyClassifier.clf is not None
@staticmethod
def load_models(config):
path = SparkFiles.get(config.get("model_file"))
MyClassifier.clf = load_from_file(path)
# Executed once per interpreter
MyClassifier.load_models(config)
main.py
:
from pyspark import SparkContext
config = ...
sc = SparkContext("local", "foo")
# Executed before StreamingContext starts
sc.addFile(config.get("model_file"))
sc.addPyFile("model.py")
import model
ssc = ...
stream = ...
stream.map(model.MyClassifier.do_something).pprint()
ssc.start()
ssc.awaitTermination()
关于python - 在 PySpark 中处理数据之前,如何在所有 Spark 工作人员上运行一个函数?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37343437/
是否有用于手动测试的代码覆盖工具?比如我新写了30行代码,编译,然后运行,有什么办法可以快速验证这30行都运行了吗? 另外,后来,在我将代码 checkin 正式版本后,有什么方法可以验证测试部门在进
老实说,这是一个家庭作业问题,但我已经浪费了好几个小时,而且无法正确解决。它返回错误数量的结果或错误的数据: 我需要选择参与指导电影和/或在电影中表演的每个人以及他们所做的次数,如果至少 5 次。 有
我正在尝试测试 MacOS 的应用内购买。输入测试用户凭据后,App Store 提示:“当前收据无效或 ds 人员 ID 不匹配。”并且购买失败。 最佳答案 我尝试了很多方法来解决这个问题。 Get
我正在为 Jenkins 使用 ActiveDirectory 插件,因此用户必须使用他们的凭据登录到 Jenkins。然后用户在 Jenkins 中被称为 joe.doe,这很完美。 当同一个人 c
如何从 Infopath 人员/组选取器检索电子邮件地址?当我将人员/组选取器添加到 infopath 表单时,我只得到 3 个字段 DisplayName、AccountId、AccountType
在 Snow Leopard 中,可以在 iCal 事件中显示空闲/忙碌时间。我搜索了 CalStore.framework 的 header ,但找不到任何描述该字段的属性。如何检索日历事件的忙/闲
是否有人成功地从专门针对 SharePoint 2013 的新建或编辑表单中获取用户(个人或组)字段的值? 我已经尝试了通过搜索互联网找到的所有解决方案以及我自己能想到的所有解决方案,所有结果都为空白
所以我需要将一个 Twitter 帐户添加到 ABRecordRef 中。然而,最快的方法似乎是获取社交资料属性的多值引用,创建它的可变版本,查找它是否有 Twitter 条目,如果已经有,则创建
我正在尝试将使用 Tomcat(最初是 5.5,但可以与 7 一起使用)在 MyEclipse 中开发的应用程序部署到我们的演示服务器 (Sun Java Web Server 7)。不幸的是,所有设
我是一名优秀的程序员,十分优秀!