gpt4 book ai didi

java - 在Hadoop/层叠中从FTP服务器读取数据

转载 作者:行者123 更新时间:2023-12-02 21:36:28 26 4
gpt4 key购买 nike

我想从FTP服务器读取数据,我以ftp://Username:Password@host/path格式提供驻留在FTP服务器上的文件的路径。
当我使用map reduce程序从文件中读取数据时,它可以正常工作。我想通过Cascading框架从同一文件读取数据。我正在使用Hfs级联框架的水龙头来读取数据。它引发以下异常

java.io.IOException: Stream closed
at org.apache.hadoop.fs.ftp.FTPInputStream.close(FTPInputStream.java:98)
at java.io.FilterInputStream.close(Unknown Source)
at org.apache.hadoop.util.LineReader.close(LineReader.java:83)
at org.apache.hadoop.mapred.LineRecordReader.close(LineRecordReader.java:168)
at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.close(MapTask.java:254)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:440)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:372)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:212)

下面是我从中读取文件的级联框架的代码:
public class FTPWithHadoopDemo {
public static void main(String args[]) {
Tap source = new Hfs(new TextLine(new Fields("line")), "ftp://user:pwd@xx.xx.xx.xx//input1");
Tap sink = new Hfs(new TextLine(new Fields("line1")), "OP\\op", SinkMode.REPLACE);
Pipe pipe = new Pipe("First");
pipe = new Each(pipe, new RegexSplitGenerator("\\s+"));
pipe = new GroupBy(pipe);
Pipe tailpipe = new Every(pipe, new Count());
FlowDef flowDef = FlowDef.flowDef().addSource(pipe, source).addTailSink(tailpipe, sink);
new HadoopFlowConnector().connect(flowDef).complete();
}
}

我试图在Hadoop源代码中查找相同的异常。我发现在MapTask类中,有一种方法runOldMapper处理流。并且在同一方法中,最后有一个块将流关闭 (in.close())。当我从finally块中删除该行时,它工作正常。下面是代码:
private <INKEY, INVALUE, OUTKEY, OUTVALUE> void runOldMapper(final JobConf job, final TaskSplitIndex splitIndex,
final TaskUmbilicalProtocol umbilical, TaskReporter reporter)
throws IOException, InterruptedException, ClassNotFoundException {
InputSplit inputSplit = getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset());

updateJobWithSplit(job, inputSplit);
reporter.setInputSplit(inputSplit);

RecordReader<INKEY, INVALUE> in = isSkipping()
? new SkippingRecordReader<INKEY, INVALUE>(inputSplit, umbilical, reporter)
: new TrackedRecordReader<INKEY, INVALUE>(inputSplit, job, reporter);
job.setBoolean("mapred.skip.on", isSkipping());

int numReduceTasks = conf.getNumReduceTasks();
LOG.info("numReduceTasks: " + numReduceTasks);
MapOutputCollector collector = null;
if (numReduceTasks > 0) {
collector = new MapOutputBuffer(umbilical, job, reporter);
} else {
collector = new DirectMapOutputCollector(umbilical, job, reporter);
}
MapRunnable<INKEY, INVALUE, OUTKEY, OUTVALUE> runner = ReflectionUtils.newInstance(job.getMapRunnerClass(),
job);

try {
runner.run(in, new OldOutputCollector(collector, conf), reporter);
collector.flush();
} finally {
// close
in.close(); // close input
collector.close();
}
}

请协助我解决这个问题。

谢谢,
阿尔沙达利

最佳答案

经过一些努力,我发现hadoop对FTP使用org.apache.hadoop.fs.ftp.FTPFileSystem类。
此类不支持搜索,即从文件开头搜索到给定的偏移量。在一个块中读取数据,然后文件系统寻求下一个块进行读取。 FTPFileSystem的默认块大小为4KB。由于不支持搜寻,因此它只能读取小于或等于4KB的数据。

关于java - 在Hadoop/层叠中从FTP服务器读取数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31858063/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com