gpt4 book ai didi

scala - 如何使用scala并行化spark中的for循环?

转载 作者:行者123 更新时间:2023-12-05 00:18:53 45 4
gpt4 key购买 nike

例如,我们有一个包含过去 3 年 2000 个股票代码收盘价的拼花文件,我们要计算每个代码的 5 天移动平均线。

所以我创建了一个 spark SQLContext 然后

val marketData = sqlcontext.sql("select DATE, SYMBOL, PRICE from stockdata order by DATE").cache()

要获取符号列表,
val symbols = marketData.select("SYMBOL").distinct().collect()

这是 for 循环:
for (symbol <- symbols) {
marketData.filter(symbol).rdd.sliding(5).map(...calculating the avg...).save()
}

显然,在 spark 上执行 for 循环很慢,而且 save()对于每个小的结果也会减慢进程(我已经尝试在 for 循环之外定义一个 var result 并将所有输出联合起来进行 IO 操作,但是我遇到了 stackoverflow 异常),那么我如何并行化 for 循环并优化IO操作?

最佳答案

您编写的程序在驱动程序(“主”) Spark 节点中运行。如果您在并行结构 (RDD) 上运行,则此程序中的表达式只能并行化。

尝试这个:

marketdata.rdd.map(symbolize).reduceByKey{ case (symbol, days) => days.sliding(5).map(makeAvg)  }.foreach{ case (symbol,averages) => averages.save() }

哪里 symbolize接受一行符号 x 天并返回一个元组(符号,天)。

关于scala - 如何使用scala并行化spark中的for循环?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37005672/

45 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com