gpt4 book ai didi

java - 获取 Spark 加载的文件的详细信息

转载 作者:行者123 更新时间:2023-11-30 07:17:36 25 4
gpt4 key购买 nike

为了在 Spark 中加载文件,我使用了这些内置方法:

JavaPairRDD<String, PortableDataStream> imageByteRDD = jsc.binaryFiles(SOURCE_PATH);

JavaPairRDD<String, String> miao = jsc.wholeTextFiles(SOURCE_PATH);

我有一个字节或字符串表示我从文件夹中获取的文件,它存储在 PairRDD 的值中。 key 包含文件名。
我怎样才能获得这些文件的详细信息?喜欢

File miao = new File(path);
//this kind of details
String date = miao.getLastModified();

我应该将它们重新转换回文件,然后读取它们,然后将它们制成另一个字节数组吗?有没有更快的过程?

最佳答案

您可以编写自定义输入格式并将该 inputFormatClass 传递给 SparkContext 上的 newApiHadoopFile 方法。此 inputFormat 将使用自定义 RecordReader,自定义 recordReader 将读取文件内容以及其他文件相关信息(即作者、修改日期等)。您需要编写一个自定义的 Writable 类来保存文件信息和记录读取器读取的文件内容。

完整的工作代码如下。此代码使用名为 RichFileInputFormat 的自定义输入格式类。 RichFileInputFormat 是一种整体文件输入格式,这意味着每个输入文件只有一个分割。这进一步意味着 rdd 分区的数量将等于输入文件的数量。因此,如果您的输入路径包含 10 个文件,那么无论输入文件的大小如何,生成的 rdd 中都将包含 10 个分区。

您可以通过以下方式从 SparkContext 调用此自定义 inputFormat 来加载文件:-

JavaPairRDD<Text, FileInfoWritable> rdd = sc.newAPIHadoopFile(args[1],    RichFileInputFormat.class, Text.class,FileInfoWritable.class, new Configuration());

因此,您的 rdd 键将是 filePath,值将是 FileInfoWritable,其中包含文件内容和其他文件相关信息。

完整的工作代码粘贴在下面:-

  1. 自定义输入格式类

           package nk.stackoverflow.spark;

    import java.io.IOException;

    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.JobContext;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    public class RichFileInputFormat extends FileInputFormat<Text, FileInfoWritable> {

    @Override
    public RecordReader<Text, FileInfoWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
    throws IOException, InterruptedException {

    return new RichFileRecordReader();
    }

    protected boolean isSplitable(JobContext context, Path filename) {
    return false;
    }
    }
    1. 记录读取器

    包 nk.stackoverflow.spark;

    import java.io.IOException;

    import org.apache.hadoop.fs.FSDataInputStream; import
    org.apache.hadoop.fs.FileStatus; import
    org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text; import
    org.apache.hadoop.mapreduce.InputSplit; import
    org.apache.hadoop.mapreduce.RecordReader; import
    org.apache.hadoop.mapreduce.TaskAttemptContext; import
    org.apache.hadoop.mapreduce.lib.input.FileSplit; import
    org.apache.spark.deploy.SparkHadoopUtil;

    public class RichFileRecordReader extends RecordReader<Text,
    FileInfoWritable> { private String author; private String
    createdDate; private String owner; private String lastModified;
    private String content; private boolean processed;

    private Text key; private Path path; private FileSystem fs;

    public RichFileRecordReader() {

    }

    @Override public void initialize(InputSplit split,
    TaskAttemptContext context) throws IOException, InterruptedException
    { // this.recordReader.initialize(split, context); final
    FileSplit fileSplit = (FileSplit) split; final Path path =
    fileSplit.getPath(); this.fs =
    path.getFileSystem(SparkHadoopUtil.get().getConfigurationFromJobContext(context));
    final FileStatus stat = this.fs.getFileStatus(path); this.path =
    path; this.author = stat.getOwner(); this.createdDate =
    String.valueOf(stat.getModificationTime()); this.lastModified =
    String.valueOf(stat.getAccessTime()); this.key = new
    Text(path.toString()); }

    @Override public boolean nextKeyValue() throws IOException,
    InterruptedException { // TODO Auto-generated method stub
    FSDataInputStream stream = null; try { if (!processed) {
    int len = (int) this.fs.getFileStatus(this.path).getLen();
    final byte[] data = new byte[len];

    stream = this.fs.open(this.path);
    int read = stream.read(data);
    String content = new String(data, 0, read);
    this.content = content;
    processed = true;
    return true; } } catch (IOException e) { e.printStackTrace(); if (stream != null) {
    try {
    stream.close();
    } catch (IOException ie) {
    ie.printStackTrace();
    } } } return false; }

    @Override public Text getCurrentKey() throws IOException,
    InterruptedException { // TODO Auto-generated method stub return
    this.key; }

    @Override public FileInfoWritable getCurrentValue() throws
    IOException, InterruptedException { // TODO Auto-generated method
    stub

    final FileInfoWritable fileInfo = new FileInfoWritable();
    fileInfo.setContent(this.content);
    fileInfo.setAuthor(this.author);
    fileInfo.setCreatedDate(this.createdDate);
    fileInfo.setOwner(this.owner);
    fileInfo.setPath(this.path.toString()); return fileInfo; }

    @Override public float getProgress() throws IOException,
    InterruptedException { // TODO Auto-generated method stub return
    processed ? 1.0f : 0.0f; }

    @Override public void close() throws IOException { // TODO
    Auto-generated method stub

    }

    }
  2. 可写类

包 nk.stackoverflow.spark;

    import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.charset.Charset;

import org.apache.hadoop.io.Writable;

import com.google.common.base.Charsets;

public class FileInfoWritable implements Writable {
private final static Charset CHARSET = Charsets.UTF_8;
private String createdDate;
private String owner;
// private String lastModified;
private String content;
private String path;
public FileInfoWritable() {

}

public void readFields(DataInput in) throws IOException {
this.createdDate = readString(in);
this.owner = readString(in);
// this.lastModified = readString(in);
this.content = readString(in);
this.path = readString(in);
}

public void write(DataOutput out) throws IOException {
writeString(createdDate, out);
writeString(owner, out);
// writeString(lastModified, out);
writeString(content, out);
writeString(path, out);
}

private String readString(DataInput in) throws IOException {
final int n = in.readInt();
final byte[] content = new byte[n];
in.readFully(content);
return new String(content, CHARSET);
}

private void writeString(String str, DataOutput out) throws IOException {
out.writeInt(str.length());
out.write(str.getBytes(CHARSET));
}

public String getCreatedDate() {
return createdDate;
}

public void setCreatedDate(String createdDate) {
this.createdDate = createdDate;
}

public String getAuthor() {
return owner;
}

public void setAuthor(String author) {
this.owner = author;
}

/*public String getLastModified() {
return lastModified;
}*/

/*public void setLastModified(String lastModified) {
this.lastModified = lastModified;
}*/

public String getOwner() {
return owner;
}

public void setOwner(String owner) {
this.owner = owner;
}

public String getContent() {
return content;
}

public void setContent(String content) {
this.content = content;
}

public String getPath() {
return path;
}

public void setPath(String path) {
this.path = path;
}


}
  • 展示如何使用的主类
  • 包 nk.stackoverflow.spark;

    import org.apache.hadoop.conf.Configuration; import
    org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; import
    org.apache.spark.api.java.JavaPairRDD; import
    org.apache.spark.api.java.JavaSparkContext; import
    org.apache.spark.api.java.function.VoidFunction;

    import scala.Tuple2;

    public class CustomInputFormat { public static void main(String[]
    args) {
    SparkConf conf = new SparkConf();

    conf.setAppName(args[0]);
    conf.setMaster("local[*]");
    final String inputPath = args[1];
    JavaSparkContext sc = new
    JavaSparkContext(conf);
    JavaPairRDD<Text, FileInfoWritable> rdd = sc.newAPIHadoopFile(inputPath, RichFileInputFormat.class,
    Text.class,
    FileInfoWritable.class, new Configuration());

    rdd.foreach(new VoidFunction<Tuple2<Text, FileInfoWritable>>() {

    public void call(Tuple2<Text, FileInfoWritable> t) throws
    Exception {
    final Text filePath = t._1();
    final String fileContent = t._2().getContent();
    System.out.println("file " + filePath + " has contents= " + fileContent); } });

    sc.close(); } }

    关于java - 获取 Spark 加载的文件的详细信息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38118462/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com