gpt4 book ai didi

java - 如何使用 MapReduce 函数通过 Spark 在 Java 中排序

转载 作者:行者123 更新时间:2023-12-02 21:21:01 25 4
gpt4 key购买 nike

您好我正在寻找一种在 Spark(使用 Java 代码)和使用 map reduce 中进行简单排序的方法。我对此很陌生,因此对 map/reduce 工作原理的一个很好的解释将非常有帮助。我已经阅读了一些可以的解释,但根本没有谈论对我更有帮助的代码。

我有一个输入数据文件,其中包含数百万个 ascii 100 字节记录/或更好的 100 字节二进制记录。我想对每条记录/行的前 10 个字节进行排序。这些文件大约 10 TB,所以数据量很大,我不确定最快的方法是什么。我将如何使用 map/reduce 来做到这一点。 Java 不是我的语言,所以写出实际代码会非常有帮助。

我现在所做的就是

SparkConf sparkConf = new SparkConf().setAppName("JavaSparkSimpleSort");
sparkConf.setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
JavaRDD<String> lines = jsc.textFile("hdfs://localhost:19000/hdfsfiles/ASCII500Mill", 10);
lines.sortBy( new Function<String, String>()
{
@Override
public String call( String value ) {
return value.substring(0,10);
}
},true,1);


jsc.textFile("hdfs://localhost:19000/hdfsfiles/ASCII500Mill", 10);
jsc.stop();

编辑:所以我仍在努力,真的需要帮助。我可以很好地执行 map 功能,创建键,但是我不应该在那时只调用 sortByKey 吗?我在 SO 上阅读了一个有点类似的问题/答案,看来减少步骤仍然是必要的。我只是不明白这些调用在做什么的“为什么”或“如何”,以及我能做些什么来使它成为我能做的最简单的“MapReduce 排序”算法。我只需要 map().reduce().sortByKey() 或任何我需要的顺序来完成这项工作。任何帮助将不胜感激。

EDIT2:我还看到,在文本文件上使用 map 的人通常(从我所见)将文本文件拆分为单个单词(其他人通常对 map 和 textfiles 做什么,以及作为示例给出的内容Spark 文档/指南网站)。我已经按行排序(不是单个单词),所以也许我不需要 map ?我知道我正在按键排序,但这没有理由比使用每行前 10 个字节返回输入文件的“映射”RDD 更多。但话又说回来,我忘记了原始线的偏移/位置。抱歉这里的无知,我不习惯用 Java 编程,尤其是委托(delegate),即使它们类似于 C#,也不习惯 FlatMap 或 Spark,所以我在这里远离我的元素。再次,任何帮助,非常感谢。谢谢!

最佳答案

我发现这种排序虽然不是通过记录的前十个字节来进行,而是通过创建一个带有要记录的数字的 map /字典,然后计数然后排序。这不完全是我想要的,但它应该可以帮助任何来到这里的人。我从 spark 文档中提取了 map/reduce 部分。

import java.io.*;
import java.util.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Timer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import scala.*;

public class JavaSparkPi {
public static void main(String[] args) throws Exception {
SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi");
sparkConf.setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);

JavaRDD<String> lines = jsc.textFile("hdfs://localhost:19000/hdfsfiles/500mill", 10);
String s = lines.first();
System.out.println("First element in UNsorted array: " + s);
JavaPairRDD<String, Integer> pairs = lines.mapToPair(p -> new Tuple2(p, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);
JavaPairRDD<String,Integer> sorted = counts.sortByKey();

s = lines.first();
System.out.println("First element in sorted array: " + s);
lines.saveAsTextFile("hdfs://localhost:19000/hdfsfiles/500millOUT4");
jsc.stop();
}
}

关于java - 如何使用 MapReduce 函数通过 Spark 在 Java 中排序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37333673/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com