gpt4 book ai didi

scala - 并行处理多个文件作为独立的 RDD

转载 作者:行者123 更新时间:2023-12-04 17:46:45 26 4
gpt4 key购买 nike

我有一个场景,其中必须对许多小文件(每个约 300MB)应用一定数量的操作,包括 group by。操作看起来像这样..
df.groupBy(....).agg(....)
现在要在多个文件上处理它,我可以使用通配符“/**/*.csv”但是,它会创建一个 RDD 并将其分区以进行操作。但是,从操作来看,它是一个 group by 并且涉及很多 shuffle,如果文件是互斥的,这是不必要的。

我在看的是,一种可以在文件上创建独立 RDD 并独立操作它们的方法。

最佳答案

这与其说是一个完整的解决方案,不如说是一个想法,我还没有对其进行测试。

您可以从将数据处理管道提取到函数中开始。

def pipeline(f: String, n: Int) = {
sqlContext
.read
.format("com.databricks.spark.csv")
.option("header", "true")
.load(f)
.repartition(n)
.groupBy(...)
.agg(...)
.cache // Cache so we can force computation later
}

如果您的文件很小,您可以调整 n参数使用尽可能少的分区来适应单个文件中的数据并避免混洗。这意味着您正在限制并发,但我们稍后会回到这个问题。
val n: Int = ??? 

接下来,您必须获得输入文件列表。此步骤取决于数据源,但大多数情况下它或多或少很简单:
val files: Array[String] = ???

接下来你可以使用 pipeline 映射上面的列表功能:
val rdds = files.map(f => pipeline(f, n))

由于我们在单个文件级别限制并发,因此我们希望通过提交多个作业来进行补偿。让我们添加一个简单的助手来强制评估并用 Future 包装它
import scala.concurrent._
import ExecutionContext.Implicits.global

def pipelineToFuture(df: org.apache.spark.sql.DataFrame) = future {
df.rdd.foreach(_ => ()) // Force computation
df
}

最后我们可以在 rdds 上使用上面的助手:
val result = Future.sequence(
rdds.map(rdd => pipelineToFuture(rdd)).toList
)

根据您的要求,您可以添加 onComplete回调或使用 react 流来收集结果。

关于scala - 并行处理多个文件作为独立的 RDD,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31912858/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com