gpt4 book ai didi

python - 如何避免一个 Spark Streaming 窗口阻塞另一个窗口同时运行一些 native Python 代码

转载 作者:太空狗 更新时间:2023-10-29 19:27:39 27 4
gpt4 key购买 nike

我正在使用两个不同的窗口运行 Spark Streaming(在窗口上使用 SKLearn 训练模型,在另一个窗口上基于该模型预测值)我想知道如何避免一个窗口(“慢”训练窗口)来训练模型,而不会“阻塞”“快速”预测窗口。
我的简化代码如下所示:

conf = SparkConf()
conf.setMaster("local[4]")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 1)

stream = ssc.socketTextStream("localhost", 7000)


import Custom_ModelContainer

### Window 1 ###
### predict data based on model computed in window 2 ###

def predict(time, rdd):
try:
# ... rdd conversion to df, feature extraction etc...

# regular python code
X = np.array(df.map(lambda lp: lp.features.toArray()).collect())
pred = Custom_ModelContainer.getmodel().predict(X)

# send prediction to GUI

except Exception, e: print e

predictionStream = stream.window(60,60)
predictionStream.foreachRDD(predict)


### Window 2 ###
### fit new model ###

def trainModel(time, rdd):
try:
# ... rdd conversion to df, feature extraction etc...

X = np.array(df.map(lambda lp: lp.features.toArray()).collect())
y = np.array(df.map(lambda lp: lp.label).collect())

# train test split etc...

model = SVR().fit(X_train, y_train)
Custom_ModelContainer.setModel(model)

except Exception, e: print e

modelTrainingStream = stream.window(600,600)
modelTrainingStream.foreachRDD(trainModel)

(注意:Custom_ModelContainer 是我写的一个类,用来保存和检索训练好的模型)

我的设置通常运行良好,除了每次在第二个窗口中训练新模型时(大约需要一分钟),第一个窗口在模型训练完成之前不会计算预测。实际上,我想这是有道理的,因为模型拟合和预测都是在主节点上计算的(在非分布式设置中 - 由于 SKLearn)。

所以我的问题如下:是否可以在单个工作节点(而不是主节点)上训练模型?如果是这样,我如何才能实现后者并真正解决我的问题?

如果没有,关于如何在不延迟窗口 1 中的计算的情况下进行此类设置的任何其他建议?

非常感谢任何帮助。

编辑:我想更一般的问题是:如何在两个不同的 worker 上并行运行两个不同的任务?

最佳答案

免责声明:这只是一组想法。这些都没有经过实践测试。


您可以尝试一些事情:

  1. 不要收集预测scikit-learn 模型通常是可序列化的,因此可以在集群上轻松处理预测过程:

    def predict(time, rdd):
    ...

    model = Custom_ModelContainer.getmodel()
    pred = (df.rdd.map(lambda lp: lp.features.toArray())
    .mapPartitions(lambda iter: model.predict(np.array(list(iter)))))
    ...

    它不仅应该并行预测,而且如果未将原始数据传递给 GUI,还应该减少必须收集的数据量。

  2. 尝试异步收集和发送数据。 PySpark 不提供 collectAsync 方法,但您可以尝试使用 concurrent.futures 实现类似的功能:

    from pyspark.rdd import RDD
    from concurrent.futures import ThreadPoolExecutor

    executor = ThreadPoolExecutor(max_workers=4)

    def submit_to_gui(*args): ...

    def submit_if_success(f):
    if not f.exception():
    executor.submit(submit_to_gui, f.result())

    从 1 继续

    def predict(time, rdd):
    ...
    f = executor.submit(RDD.collect, pred)
    f.add_done_callback(submit_if_success)
    ...
  3. 如果您真的想使用本地 scikit-learn 模型,请尝试使用上述 future 来收集拟合。您也可以尝试只收集一次,尤其是在数据未缓存的情况下:

    def collect_and_train(df):
    y, X = zip(*((p.label, p.features.toArray()) for p in df.collect()))
    ...
    return SVR().fit(X_train, y_train)

    def set_if_success(f):
    if not f.exception():
    Custom_ModelContainer.setModel(f.result())

    def trainModel(time, rdd):
    ...
    f = excutor.submit(collect_and_train, df)
    f.add_done_callback(set_if_success)
    ...
  4. 使用现有解决方案(例如 spark-sklearn)将训练过程转移到集群中或自定义方法:

    • 简单的解决方案 - 准备您的数据,合并 (1) 并使用 mapPartitions 训练单个模型。
    • 分布式解决方案 - 使用 mapPartitions 创建并验证每个分区的单独模型,收集模型并作为整体使用,例如通过平均或中值预测。
  5. 抛弃 scikit-learn 并使用可以在分布式流式环境中训练和维护的模型(例如 StreamingLinearRegressionWithSGD )。

    您当前的方法使 Spark 过时了。如果您可以在本地训练模型,那么您很有可能可以在本地机器上更快地执行所有其他任务。否则,您的程序将在 collect 上失败。

关于python - 如何避免一个 Spark Streaming 窗口阻塞另一个窗口同时运行一些 native Python 代码,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35035046/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com