gpt4 book ai didi

hadoop - 具有 HadoopRDD 自定义 InputFormat 的 Apache Spark

转载 作者:可可西里 更新时间:2023-11-01 14:18:57 26 4
gpt4 key购买 nike

我目前正在研究 Apache Spark。我已经实现了自定义 InputFormat适用于通过 TCP 套接字读取键值记录的 Apache Hadoop。我想将此代码移植到 Apache Spark 并将其与 hadoopRDD() 一起使用功能。我的 Apache Spark 代码如下:

public final class SparkParallelDataLoad {

public static void main(String[] args) {
int iterations = 100;
String dbNodesLocations = "";
if(args.length < 3) {
System.err.printf("Usage ParallelLoad <coordinator-IP> <coordinator-port> <numberOfSplits>\n");
System.exit(1);
}
JobConf jobConf = new JobConf();
jobConf.set(CustomConf.confCoordinatorIP, args[0]);
jobConf.set(CustomConf.confCoordinatorPort, args[1]);
jobConf.set(CustomConf.confDBNodesLocations, dbNodesLocations);

int numOfSplits = Integer.parseInt(args[2]);

CustomInputFormat.setCoordinatorIp(args[0]);
CustomInputFormat.setCoordinatorPort(Integer.parseInt(args[1]));

SparkConf sparkConf = new SparkConf().setAppName("SparkParallelDataLoad");

JavaSparkContext sc = new JavaSparkContext(sparkConf);

JavaPairRDD<LongWritable, Text> records = sc.hadoopRDD(jobConf,
CustomInputFormat.class, LongWritable.class, Text.class,
numOfSplits);

JavaRDD<LabeledPoint> points = records.map(new Function<Tuple2<LongWritable, Text>, LabeledPoint>() {

private final Log log = LogFactory.getLog(Function.class);
/**
*
*/
private static final long serialVersionUID = -1771348263117622186L;

private final Pattern SPACE = Pattern.compile(" ");
@Override
public LabeledPoint call(Tuple2<LongWritable, Text> tuple)
throws Exception {
if(tuple == null || tuple._1() == null || tuple._2() == null)
return null;
double y = Double.parseDouble(Long.toString(tuple._1.get()));
String[] tok = SPACE.split(tuple._2.toString());
double[] x = new double[tok.length];
for (int i = 0; i < tok.length; ++i) {
if(tok[i].isEmpty() == false)
x[i] = Double.parseDouble(tok[i]);
}
return new LabeledPoint(y, Vectors.dense(x));
}

});

System.out.println("Number of records: " + points.count());
LinearRegressionModel model = LinearRegressionWithSGD.train(points.rdd(), iterations);
System.out.println("Model weights: " + model.weights());

sc.stop();
}
}

在我的项目中,我还必须决定哪个 Spark Worker 将连接到哪个数据源(类似于具有 1:1 关系的“配对”过程)。因此,我创建了一些 InputSplit s 等于数据源的数量,以便我的数据并行发送到 SparkContext .我的问题如下:

  1. 方法的结果InpuSplit.getLength()影响多少条记录RecordReader返回?详细地说,我在测试运行中看到作业在仅返回一条记录后结束,只是因为我从 CustomInputSplit.getLength() 返回的值为 0。功能。

  2. 在 Apache Spark 上下文中,worker 的数量是否等于 InputSplits 的数量来 self 的 InputFormat至少为了执行 records.map()函数调用?

上面问题 2 的答案对我的项目非常重要。

谢谢,尼克

最佳答案

是的。Spark 的 sc.hadoopRDD 将创建一个 RDD,其分区数与 InputFormat.getSplits 报告的一样多。

hadoopRDD 的最后一个参数称为 minPartitions(在您的代码中为 numOfSplits)将用作 InputFormat.getSplits 的提示。但是 getSplits 返回的数字无论是更大还是更小都将被遵守。

请参阅 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L168 处的代码

关于hadoop - 具有 HadoopRDD 自定义 InputFormat 的 Apache Spark,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24765063/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com