gpt4 book ai didi

python - 在 PySpark 中处理数据之前,如何在所有 Spark 工作人员上运行一个函数?

转载 作者:IT老高 更新时间:2023-10-28 21:03:29 28 4
gpt4 key购买 nike

我正在使用 YARN 在集群中运行 Spark Streaming 任务。集群中的每个节点都运行多个 spark worker。在流式传输开始之前,我想对集群中所有节点上的所有工作人员执行“设置”功能。

流式传输任务将传入消息分类为垃圾邮件或非垃圾邮件,但在此之前,它需要将最新的预训练模型从 HDFS 下载到本地磁盘,如以下伪代码示例:

def fetch_models():
if hadoop.version > local.version:
hadoop.download()

我在 SO 上看到了以下示例:

sc.parallelize().map(fetch_models)

但在 Spark 1.6 parallelize() 中需要使用一些数据,就像我现在正在做的这种糟糕的解决方法:

sc.parallelize(range(1, 1000)).map(fetch_models)

为了确保该函数在所有工作人员上运行,我将范围设置为 1000。我也不确切知道运行时集群中有多少工作人员。

我已经阅读了编程文档并无情地搜索了谷歌,但我似乎无法找到任何方法来实际将任何东西分发给所有工作人员而无需任何数据。

在这个初始化阶段完成后,流式任务照常处理来自 Kafka 的传入数据。

我使用模型的方式是运行一个类似的函数:

spark_partitions = config.get(ConfigKeys.SPARK_PARTITIONS)
stream.union(*create_kafka_streams())\
.repartition(spark_partitions)\
.foreachRDD(lambda rdd: rdd.foreachPartition(lambda partition: spam.on_partition(config, partition)))

理论上我可以在 on_partition 函数中检查模型是否是最新的,尽管在每个批处理上都这样做真的很浪费。我想在 Spark 开始从 Kafka 检索批处理之前执行此操作,因为从 HDFS 下载可能需要几分钟...

更新:

需要明确的是:这不是如何分发文件或如何加载文件的问题,而是如何在不操作任何数据的情况下对所有工作人员运行任意方法。

澄清当前实际加载模型的含义:

def on_partition(config, partition):
if not MyClassifier.is_loaded():
MyClassifier.load_models(config)

handle_partition(config, partition)

而 MyClassifier 是这样的:

class MyClassifier:
clf = None

@staticmethod
def is_loaded():
return MyClassifier.clf is not None

@staticmethod
def load_models(config):
MyClassifier.clf = load_from_file(config)

静态方法,因为 PySpark 似乎无法使用非静态方法序列化类(类的状态与其他工作人员无关)。在这里,我们只需调用一次 load_models(),并且在所有 future 的批处理中都将设置 MyClassifier.clf。这是真的不应该为每批做的事情,这是一次性的事情。与使用 fetch_models() 从 HDFS 下载文件相同。

最佳答案

如果您只想在工作机器之间分发文件,最简单的方法是使用 SparkFiles机制:

some_path = ...  # local file, a file in DFS, an HTTP, HTTPS or FTP URI.
sc.addFile(some_path)

并使用 SparkFiles.get 和标准 IO 工具在工作人员上检索它:

from pyspark import SparkFiles

with open(SparkFiles.get(some_path)) as fw:
... # Do something

如果您想确保实际加载模型,最简单的方法是在模块导入时加载。假设 config 可用于检索模型路径:

  • model.py:

    from pyspark import SparkFiles

    config = ...
    class MyClassifier:
    clf = None

    @staticmethod
    def is_loaded():
    return MyClassifier.clf is not None

    @staticmethod
    def load_models(config):
    path = SparkFiles.get(config.get("model_file"))
    MyClassifier.clf = load_from_file(path)

    # Executed once per interpreter
    MyClassifier.load_models(config)
  • main.py:

    from pyspark import SparkContext

    config = ...

    sc = SparkContext("local", "foo")

    # Executed before StreamingContext starts
    sc.addFile(config.get("model_file"))
    sc.addPyFile("model.py")

    import model

    ssc = ...
    stream = ...
    stream.map(model.MyClassifier.do_something).pprint()

    ssc.start()
    ssc.awaitTermination()

关于python - 在 PySpark 中处理数据之前,如何在所有 Spark 工作人员上运行一个函数?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37343437/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com