- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
为了在 Spark 中加载文件,我使用了这些内置方法:
JavaPairRDD<String, PortableDataStream> imageByteRDD = jsc.binaryFiles(SOURCE_PATH);
或
JavaPairRDD<String, String> miao = jsc.wholeTextFiles(SOURCE_PATH);
我有一个字节或字符串表示我从文件夹中获取的文件,它存储在 PairRDD 的值中。 key 包含文件名。
我怎样才能获得这些文件的详细信息?喜欢
File miao = new File(path);
//this kind of details
String date = miao.getLastModified();
我应该将它们重新转换回文件,然后读取它们,然后将它们制成另一个字节数组吗?有没有更快的过程?
最佳答案
您可以编写自定义输入格式并将该 inputFormatClass 传递给 SparkContext 上的 newApiHadoopFile 方法。此 inputFormat 将使用自定义 RecordReader,自定义 recordReader 将读取文件内容以及其他文件相关信息(即作者、修改日期等)。您需要编写一个自定义的 Writable 类来保存文件信息和记录读取器读取的文件内容。
完整的工作代码如下。此代码使用名为 RichFileInputFormat 的自定义输入格式类。 RichFileInputFormat 是一种整体文件输入格式,这意味着每个输入文件只有一个分割。这进一步意味着 rdd 分区的数量将等于输入文件的数量。因此,如果您的输入路径包含 10 个文件,那么无论输入文件的大小如何,生成的 rdd 中都将包含 10 个分区。
您可以通过以下方式从 SparkContext 调用此自定义 inputFormat 来加载文件:-
JavaPairRDD<Text, FileInfoWritable> rdd = sc.newAPIHadoopFile(args[1], RichFileInputFormat.class, Text.class,FileInfoWritable.class, new Configuration());
因此,您的 rdd 键将是 filePath,值将是 FileInfoWritable,其中包含文件内容和其他文件相关信息。
完整的工作代码粘贴在下面:-
自定义输入格式类
package nk.stackoverflow.spark;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class RichFileInputFormat extends FileInputFormat<Text, FileInfoWritable> {
@Override
public RecordReader<Text, FileInfoWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
return new RichFileRecordReader();
}
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
}
包 nk.stackoverflow.spark;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream; import
org.apache.hadoop.fs.FileStatus; import
org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text; import
org.apache.hadoop.mapreduce.InputSplit; import
org.apache.hadoop.mapreduce.RecordReader; import
org.apache.hadoop.mapreduce.TaskAttemptContext; import
org.apache.hadoop.mapreduce.lib.input.FileSplit; import
org.apache.spark.deploy.SparkHadoopUtil;
public class RichFileRecordReader extends RecordReader<Text,
FileInfoWritable> { private String author; private String
createdDate; private String owner; private String lastModified;
private String content; private boolean processed;
private Text key; private Path path; private FileSystem fs;
public RichFileRecordReader() {
}
@Override public void initialize(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException
{ // this.recordReader.initialize(split, context); final
FileSplit fileSplit = (FileSplit) split; final Path path =
fileSplit.getPath(); this.fs =
path.getFileSystem(SparkHadoopUtil.get().getConfigurationFromJobContext(context));
final FileStatus stat = this.fs.getFileStatus(path); this.path =
path; this.author = stat.getOwner(); this.createdDate =
String.valueOf(stat.getModificationTime()); this.lastModified =
String.valueOf(stat.getAccessTime()); this.key = new
Text(path.toString()); }
@Override public boolean nextKeyValue() throws IOException,
InterruptedException { // TODO Auto-generated method stub
FSDataInputStream stream = null; try { if (!processed) {
int len = (int) this.fs.getFileStatus(this.path).getLen();
final byte[] data = new byte[len];
stream = this.fs.open(this.path);
int read = stream.read(data);
String content = new String(data, 0, read);
this.content = content;
processed = true;
return true; } } catch (IOException e) { e.printStackTrace(); if (stream != null) {
try {
stream.close();
} catch (IOException ie) {
ie.printStackTrace();
} } } return false; }
@Override public Text getCurrentKey() throws IOException,
InterruptedException { // TODO Auto-generated method stub return
this.key; }
@Override public FileInfoWritable getCurrentValue() throws
IOException, InterruptedException { // TODO Auto-generated method
stub
final FileInfoWritable fileInfo = new FileInfoWritable();
fileInfo.setContent(this.content);
fileInfo.setAuthor(this.author);
fileInfo.setCreatedDate(this.createdDate);
fileInfo.setOwner(this.owner);
fileInfo.setPath(this.path.toString()); return fileInfo; }
@Override public float getProgress() throws IOException,
InterruptedException { // TODO Auto-generated method stub return
processed ? 1.0f : 0.0f; }
@Override public void close() throws IOException { // TODO
Auto-generated method stub
}
}
包 nk.stackoverflow.spark;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.charset.Charset;
import org.apache.hadoop.io.Writable;
import com.google.common.base.Charsets;
public class FileInfoWritable implements Writable {
private final static Charset CHARSET = Charsets.UTF_8;
private String createdDate;
private String owner;
// private String lastModified;
private String content;
private String path;
public FileInfoWritable() {
}
public void readFields(DataInput in) throws IOException {
this.createdDate = readString(in);
this.owner = readString(in);
// this.lastModified = readString(in);
this.content = readString(in);
this.path = readString(in);
}
public void write(DataOutput out) throws IOException {
writeString(createdDate, out);
writeString(owner, out);
// writeString(lastModified, out);
writeString(content, out);
writeString(path, out);
}
private String readString(DataInput in) throws IOException {
final int n = in.readInt();
final byte[] content = new byte[n];
in.readFully(content);
return new String(content, CHARSET);
}
private void writeString(String str, DataOutput out) throws IOException {
out.writeInt(str.length());
out.write(str.getBytes(CHARSET));
}
public String getCreatedDate() {
return createdDate;
}
public void setCreatedDate(String createdDate) {
this.createdDate = createdDate;
}
public String getAuthor() {
return owner;
}
public void setAuthor(String author) {
this.owner = author;
}
/*public String getLastModified() {
return lastModified;
}*/
/*public void setLastModified(String lastModified) {
this.lastModified = lastModified;
}*/
public String getOwner() {
return owner;
}
public void setOwner(String owner) {
this.owner = owner;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public String getPath() {
return path;
}
public void setPath(String path) {
this.path = path;
}
}
包 nk.stackoverflow.spark;
import org.apache.hadoop.conf.Configuration; import
org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; import
org.apache.spark.api.java.JavaPairRDD; import
org.apache.spark.api.java.JavaSparkContext; import
org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
public class CustomInputFormat { public static void main(String[]
args) {
SparkConf conf = new SparkConf();
conf.setAppName(args[0]);
conf.setMaster("local[*]");
final String inputPath = args[1];
JavaSparkContext sc = new
JavaSparkContext(conf);
JavaPairRDD<Text, FileInfoWritable> rdd = sc.newAPIHadoopFile(inputPath, RichFileInputFormat.class,
Text.class,
FileInfoWritable.class, new Configuration());
rdd.foreach(new VoidFunction<Tuple2<Text, FileInfoWritable>>() {
public void call(Tuple2<Text, FileInfoWritable> t) throws
Exception {
final Text filePath = t._1();
final String fileContent = t._2().getContent();
System.out.println("file " + filePath + " has contents= " + fileContent); } });
sc.close(); } }
关于java - 获取 Spark 加载的文件的详细信息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38118462/
目录 进程 其他相关概念 创建线程的两种方式 为什么使用start()方法而不直接使用run()方法 start()方法底层
CURL状态码列表 状态码 状态原因 解释 0 正常访问
ODBC连接类函数 odbc_connect函数:打开一个ODBC连接 odbc_close函数:关闭一个已经打开的ODBC连接 odbc_close_all函数:关闭所有已经打开的ODBC连
作为标题,如何计算从纪元1900到现在使用boost的持续时间? 编辑:很抱歉以前的问题太短。我将再次描述我的问题。 我有关于将生日另存为整数的问题。我创建了四个函数,用法如下: ptime转换为整数
前言 在Java中,有一个常被忽略 但 非常重要的关键字Synchronized今天,我将详细讲解 Java关键字Synchronized的所有知识,希望你们会喜欢 目录 1. 定义 J
详细 JVM 垃圾收集日志的时间戳是收集的开始还是结束? 2016-08-09T21:04:19.756-0400: 224890.317: [GC Desired survivor size 167
我在“Master-Detail”概念上苦苦挣扎,除了一点点(但很重要)的细微差别外,几乎所有东西都按预期工作。我应该在 Storyboard上更改什么以在详细信息 View (屏幕截图底部的右上角)
我希望能够显示表格的详细 View ,但不推送新屏幕,而只显示表格所在的详细 View 。 设置它的最佳方式是什么......如果真的可行的话? ---------------------------
我在我的博客中为我的帖子使用了详细 View ,每篇帖子都有评论,所以我想对它们进行分页,但我不知道该怎么做,因为我请求了帖子模型。我知道如何在功能 View 中执行此操作,但不知道如何在详细 Vie
在下面的代码中,与 pm 对齐,该行是否会 move 整个内存并将其分配给 pm,或者它只会 move p 指向的内存而不是整个数组? int main() { int*
1·下载 https://dev.mysql.com/downloads/mysql/ 2·安装服务 1)管理员运行cmd 2)D: 3)cd D:\mysql
今天以前一直用的SQL Server 2005做开发,偶尔也用MySQL,现入手公司项目,用到SQL Server 2008,于是乎必须安装它,免得出现其他很纠结的小问题,现将自己安装图解分享如下:
1. crontab命令选项 复制代码 代码如下: #crontab -u <-l, -r, -e> -u指定一个用
我们有一个 WPF 应用程序,它有一个主窗口/详细信息窗口,两者都是 WPF 数据网格。当您在上部数据网格中选择一行时,详细信息将显示在下部数据网格中。我想知道从 UI 的角度来看是否有任何关于如何处
在可视化 Perforce 客户端 (p4v) 中有一个选项: 显示文件操作的 p4 命令输出 启用后,在日志 Pane 中,我可以看到这样的详细日志记录: p4 sync /Users/az/ftp
在其他服务器上设置测试环境后,在几个API调用中出现错误。 99%肯定这是MySQL的事情,但是返回的错误根本没有帮助: global name 'sys' is not defined" 我已经导入
我正在维护一个通用的 iOS 应用程序,其开发已开始于 iOS 6。我正在为 iOS 7 更新 UI。现在我遇到了应用程序的 iPad 部分的奇怪问题。这部分遵循使用 UISplitViewContr
我希望我能正确描述这种情况。当它发生时很容易在屏幕上看到,但很难用语言解释,但我会尽力而为。 我有一个带有固定主视图 (UITableView) 和两个详细 View 之一的 UISplitViewC
我尝试在 eclipse 和 intelliJ 参数中使用垃圾收集记录器来配置简单的测试程序。尝试了不同类型的配置,但尚未创建日志文件。 -XX:+PrintGCDetails -XX:+PrintG
正如您所知,.cap 文件中的 java 小程序的输出文件格式必须通过智能卡读卡器/写卡器(如 ACR122 或任何其他读卡器)部署到 java 卡,而且我相信 java 卡与 java 卡之间的部署
我是一名优秀的程序员,十分优秀!