gpt4 book ai didi

python-2.7 - 如何使用 PySpark 并行运行独立转换?

转载 作者:太空宇宙 更新时间:2023-11-03 21:37:52 24 4
gpt4 key购买 nike

我正在尝试使用 PySpark 并行运行 2 个函数,在单个 RDD 上执行完全独立的转换。有哪些方法可以做到这一点?

def doXTransforms(sampleRDD):
(X transforms)

def doYTransforms(sampleRDD):
(Y Transforms)

if __name__ == "__main__":
sc = SparkContext(appName="parallelTransforms")
sqlContext = SQLContext(sc)
hive_context = HiveContext(sc)

rows_rdd = hive_context.sql("select * from tables.X_table")

p1 = Process(target=doXTransforms , args=(rows_rdd,))
p1.start()
p2 = Process(target=doYTransforms, args=(rows_rdd,))
p2.start()
p1.join()
p2.join()
sc.stop()

这不起作用,我现在明白这不起作用。但有没有其他方法可以让这项工作发挥作用呢?具体有没有针对python-spark的解决方案?

最佳答案

只需使用线程并确保集群有足够的资源来同时处理这两个任务。

from threading import Thread
import time

def process(rdd, f):
def delay(x):
time.sleep(1)
return f(x)
return rdd.map(delay).sum()


rdd = sc.parallelize(range(100), int(sc.defaultParallelism / 2))

t1 = Thread(target=process, args=(rdd, lambda x: x * 2))
t2 = Thread(target=process, args=(rdd, lambda x: x + 1))
t1.start(); t2.start()

可以说,这在实践中并不常用,但在其他方面应该可以正常工作。

您可以进一步使用in-application scheduling使用FAIR调度程序和调度程序池可以更好地控制执行策略。

您也可以尝试pyspark-asyncactions (免责声明 - 该答案的作者也是该包的作者)它提供了一组围绕 Spark API 和 concurrent.futures 的包装器:

import asyncactions
import concurrent.futures

f1 = rdd.filter(lambda x: x % 3 == 0).countAsync()
f2 = rdd.filter(lambda x: x % 11 == 0).countAsync()

[x.result() for x in concurrent.futures.as_completed([f1, f2])]

关于python-2.7 - 如何使用 PySpark 并行运行独立转换?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53128591/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com