gpt4 book ai didi

hadoop - 未设置Hadoop Pig输出目录

转载 作者:行者123 更新时间:2023-12-02 20:11:41 25 4
gpt4 key购买 nike

我正在编写自己的Pig Store类,我不想将其存储在文件中,我打算将其发送到某些第三方数据存储(缺少API调用)。

注意:我正在Cloudera的VirtualBox镜像上运行它。

我已经写了我的java类(在下面列出)并创建了mystore.jar,它在id.pig脚本下面使用:

store B INTO 'mylocation' USING MyStore('mynewlocation')

使用Pig运行此脚本时,出现以下错误:
错误6000:
输出位置验证失败:'file://home/cloudera/test/id.out要遵循的更多信息:
未设置输出目录。
or.apache.pig.impl.plan.VisitorException: ERROR 6000:
at or.apache.pig.newplan.logical.rules.InputOutputFileValidator$InputOutputFileValidator.visit(InputOutputFileValidator.java:95)

请帮忙!

-------------------- MyStore.java ----------------------
public class MyStore extends StoreFunc {
protected RecordWriter writer = null;
private String location = null;


public MyStore () {
location= null;
}

public MyStore (String location) {
this.location= location;
}

@Override
public OutputFormat getOutputFormat() throws IOException {
return new MyStoreOutputFormat(location);
}

@Override
public void prepareToWrite(RecordWriter writer) throws IOException {
this.writer = writer;
}

@Override
public void putNext(Tuple tuple) throws IOException {
//write tuple to location

try {
writer.write(null, tuple.toString());
} catch (InterruptedException e) {
e.printStackTrace();
}
}

@Override
public void setStoreLocation(String location, Job job) throws IOException {
if(location!= null)
this.location= location;
}

}

-------------------- MyStoreOutputFormat.java ----------------------
import java.io.DataOutputStream;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.pig.data.Tuple;

public class MyStoreOutputFormat extends
TextOutputFormat<WritableComparable, Tuple> {
private String location = null;

public MyStoreOutputFormat(String location) {

this.location = location;
}

@Override
public RecordWriter<WritableComparable, Tuple> getRecordWriter(
TaskAttemptContext job) throws IOException, InterruptedException {

Configuration conf = job.getConfiguration();

String extension = location;
Path file = getDefaultWorkFile(job, extension);
FileSystem fs = file.getFileSystem(conf);

FSDataOutputStream fileOut = fs.create(file, false);

return new MyStoreRecordWriter(fileOut);
}

protected static class MyStoreRecordWriter extends
RecordWriter<WritableComparable, Tuple> {

DataOutputStream out = null;

public MyStoreRecordWriter(DataOutputStream out) {
this.out = out;
}

@Override
public void close(TaskAttemptContext taskContext) throws IOException,
InterruptedException {
// close the location
}

@Override
public void write(WritableComparable key, Tuple value)
throws IOException, InterruptedException {

// write the data to location
if (out != null) {
out.writeChars(value.toString()); // will be calling API later. let me first dump to the location!
}
}

}
}

我在这里想念什么吗?

最佳答案

首先,我认为您应该使用Job配置存储位置值,而不是实例变量

在计划作业时,将调用setStoreLocation方法中对局部变量“location”的赋值,但是直到执行阶段才可能进行getOutputFormat调用,到那时该位置变量可能不再设置(类的新实例)可能已经创建)。

如果查看PigStorage.setStoreLocation的源代码,您应该注意到它们将位置存储在Job配置(第二行)中:

@Override
public void setStoreLocation(String location, Job job) throws IOException {
job.getConfiguration().set("mapred.textoutputformat.separator", "");
FileOutputFormat.setOutputPath(job, new Path(location));

if( "true".equals( job.getConfiguration().get( "output.compression.enabled" ) ) ) {
FileOutputFormat.setCompressOutput( job, true );
String codec = job.getConfiguration().get( "output.compression.codec" );
try {
FileOutputFormat.setOutputCompressorClass( job, (Class<? extends CompressionCodec>) Class.forName( codec ) );
} catch (ClassNotFoundException e) {
throw new RuntimeException("Class not found: " + codec );
}
} else {
// This makes it so that storing to a directory ending with ".gz" or ".bz2" works.
setCompression(new Path(location), job);
}
}

所以我认为您应该将位置存储在作业变量中:
@Override
public void setStoreLocation(String location, Job job) throws IOException {
if(location!= null)
job.getConfiguration().set("mylocation", location);
}

然后可以在createRecordReader方法中提取哪种自定义输出格式:
@Override
public RecordWriter<WritableComparable, Tuple> getRecordWriter(
TaskAttemptContext job) throws IOException, InterruptedException {

Configuration conf = job.getConfiguration();

String extension = conf.get("mylocation");
Path file = getDefaultWorkFile(job, extension);
FileSystem fs = file.getFileSystem(conf);

FSDataOutputStream fileOut = fs.create(file, false);

return new MyStoreRecordWriter(fileOut);
}

最后(可能是您所看到的错误的实际原因),输出格式扩展了TextOutputFormat,并且在记录编写器中使用了 getDefaultWorkFile方法-该方法需要知道将文件输出到HDFS的位置,然后还没有在setStoreLocation方法中调用 FileOutputFormat.setOutputPath(job, new Path(location));(请参阅我先前粘贴的PigStorage.setStoreLocation方法)。因此出现错误是因为它不知道在哪里创建默认工作文件。

关于hadoop - 未设置Hadoop Pig输出目录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14720044/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com