gpt4 book ai didi

java - Spark - 使用 foreachpartition 收集分区

转载 作者:行者123 更新时间:2023-11-30 07:48:58 26 4
gpt4 key购买 nike

我们正在使用 spark 进行文件处理。我们正在处理相当大的文件,每个文件大约 30 GB,大约有 40-50 百万行。这些文件是格式化的。我们将它们加载到数据框中。最初的要求是识别符合条件的记录并将它们加载到 MySQL。我们能够做到这一点。

要求最近发生了变化。不符合标准的记录现在将存储在备用数据库中。这引起了问题,因为集合的大小太大了。我们正在尝试独立收集每个分区并按照此处的建议合并到一个列表中

https://umbertogriffo.gitbooks.io/apache-spark-best-practices-and-tuning/content/dont_collect_large_rdds.html

我们不熟悉 scala,所以我们在将其转换为 Java 时遇到了麻烦。我们如何逐个遍历分区并收集?

谢谢

最佳答案

请使用 df.foreachPartition 对每个分区独立执行,不会返回给驱动程序。您可以在每个执行器级别将匹配结果保存到数据库中。如果您想在驱动程序中收集结果,请使用不推荐用于您的情况的 mappartitions。

请引用以下链接

Spark - Java - foreachPartition

dataset.foreachPartition(new ForeachPartitionFunction<Row>() {
public void call(Iterator<Row> r) throws Exception {
while (t.hasNext()){

Row row = r.next();
System.out.println(row.getString(1));

}
// do your business logic and load into MySQL.
}
});

对于 map 分区:

// You can use the same as Row but for clarity I am defining this.

public class ResultEntry implements Serializable {
//define your df properties ..
}


Dataset<ResultEntry> mappedData = data.mapPartitions(new MapPartitionsFunction<Row, ResultEntry>() {

@Override
public Iterator<ResultEntry> call(Iterator<Row> it) {
List<ResultEntry> filteredResult = new ArrayList<ResultEntry>();
while (it.hasNext()) {
Row row = it.next()
if(somecondition)
filteredResult.add(convertToResultEntry(row));
}
return filteredResult.iterator();
}
}, Encoders.javaSerialization(ResultEntry.class));

希望这对您有所帮助。

拉维

关于java - Spark - 使用 foreachpartition 收集分区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48818202/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com