gpt4 book ai didi

java - 如何在spark RDD(JavaRDD)中获取记录的文件名

转载 作者:可可西里 更新时间:2023-11-01 14:20:46 26 4
gpt4 key购买 nike

我正在使用 JavaRDD 加载多个文件

JavaRDD<String> allLines = sc.textFile(hdfs://path/*.csv);

加载文件后,我修改了每条记录并想保存它们。但是,我还需要将原始文件名 (ID) 与记录一起保存,以备将来引用。无论如何,我可以从 RDD 中的单个记录中获取原始文件名吗?谢谢

最佳答案

您可以尝试执行以下代码段中的操作:

JavaPairRDD<LongWritable, Text> javaPairRDD = sc.newAPIHadoopFile(
"hdfs://path/*.csv",
TextInputFormat.class,
LongWritable.class,
Text.class,
new Configuration()
);
JavaNewHadoopRDD<LongWritable, Text> hadoopRDD = (JavaNewHadoopRDD) javaPairRDD;

JavaRDD<Tuple2<String, String>> namedLinesRDD = hadoopRDD.mapPartitionsWithInputSplit((inputSplit, lines) -> {
FileSplit fileSplit = (FileSplit) inputSplit;
String fileName = fileSplit.getPath().getName();

Stream<Tuple2<String, String>> stream =
StreamSupport.stream(Spliterators.spliteratorUnknownSize(lines, Spliterator.ORDERED), false)
.map(line -> {
String lineText = line._2().toString();
// emit file name as key and line as a value
return new Tuple2(fileName, lineText);
});
return stream.iterator();
}, true);

更新(针对 java7)

JavaRDD<Tuple2<String, String>> namedLinesRDD = hadoopRDD.mapPartitionsWithInputSplit(
new Function2<InputSplit, Iterator<Tuple2<LongWritable, Text>>, Iterator<Tuple2<String, String>>>() {
@Override
public Iterator<Tuple2<String, String>> call(InputSplit inputSplit, final Iterator<Tuple2<LongWritable, Text>> lines) throws Exception {
FileSplit fileSplit = (FileSplit) inputSplit;
final String fileName = fileSplit.getPath().getName();
return new Iterator<Tuple2<String, String>>() {
@Override
public boolean hasNext() {
return lines.hasNext();
}
@Override
public Tuple2<String, String> next() {
Tuple2<LongWritable, Text> entry = lines.next();
return new Tuple2<String, String>(fileName, entry._2().toString());
}
};
}
},
true
);

关于java - 如何在spark RDD(JavaRDD)中获取记录的文件名,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32465469/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com