gpt4 book ai didi

Hadoop 无法从 s3 复制输入 bz2 文件

转载 作者:可可西里 更新时间:2023-11-01 14:58:55 25 4
gpt4 key购买 nike

我有一个 map-only hadoop 作业,在 Amazon 的 EMR 上运行,运行在最新的 ami 版本:3.0.4 上。偶尔我会遇到这样的异常:

Error: com.amazonaws.AmazonClientException: Unable to verify integrity of data download.  Client calculated content length didn't match content length received from Amazon S3.  The
data may be corrupt.
at com.amazonaws.util.ContentLengthValidationInputStream.validate(ContentLengthValidationInputStream.java:144)
at com.amazonaws.util.ContentLengthValidationInputStream.read(ContentLengthValidationInputStream.java:81)
at java.io.FilterInputStream.read(FilterInputStream.java:133)
at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.read(EmrFileSystem.java:289)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:275)
at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
at java.io.DataInputStream.read(DataInputStream.java:149)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
at java.io.BufferedInputStream.read(BufferedInputStream.java:254)
at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.readAByte(CBZip2InputStream.java:195)
at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:866)
at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:504)
at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:423)
at org.apache.hadoop.io.compress.BZip2Codec.read(BZip2Codec.java:483)
at java.io.InputStream.read(InputStream.java:101)

at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:211)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:164)
at org.apache.hadoop.mapred.MapTask.nextKeyValue(MapTask.java:544)
at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper.nextKeyValue(WrappedMapper.java:91)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:775)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:162)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)

有什么办法可以治好吗?为什么会这样?是amazon的网络问题吗?输入文件不会有问题,因为重新运行相同的作业通常会成功。有没有办法捕获这个异常?为什么 hadoop 不自动治愈它?

我的主类是这样的:

public class LogParserMapReduce extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(LogParserMapReduce.class);

@Override
public int run(String[] args) throws Exception {
Configuration conf = super.getConf();

conf.setBoolean("mapred.compress.map.output", true);
conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);
conf.setBoolean("keep.failed.task.files", true);

/*
* Instantiate a Job object for your job's configuration.
*/
Job job = Job.getInstance(conf);

/*
* The expected command-line arguments are the paths containing
* input and output data. Terminate the job if the number of
* command-line arguments is not exactly 2.
*/
if (args.length != 2) {
System.out.printf("Usage: LogParserMapReduce <input dir> <output dir>\n");
System.exit(-1);
}

/*
* Specify the jar file that contains your driver, mapper, and reducer.
* Hadoop will transfer this jar file to nodes in your cluster running
* mapper and reducer tasks.
*/
job.setJarByClass(LogParserMapReduce.class);

/*
* Specify an easily-decipherable name for the job.
* This job name will appear in reports and logs.
*/
job.setJobName("LogParser");

/*
* Specify the paths to the input and output data based on the
* command-line arguments.
*/
FileInputFormat.addInputPaths(job, args[0]);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);

/*
* Specify the mapper and reducer classes.
*/
job.setMapperClass(LogParserMapper.class);

/*
* For the SysLogEvent count application, the input file and output
* files are in text format - the default format.
*
* In text format files, each record is a line delineated by a
* by a line terminator.
*
* When you use other input formats, you must call the
* SetInputFormatClass method. When you use other
* output formats, you must call the setOutputFormatClass method.
*/

/*
* For the logs count application, the mapper's output keys and
* values have the same data types as the reducer's output keys
* and values: Text and IntWritable.
*
* When they are not the same data types, you must call the
* setMapOutputKeyClass and setMapOutputValueClass
* methods.
*/

/*
* Specify the job's output key and value classes.
*/
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);

job.setNumReduceTasks(0);

LOG.info("LogParserMapReduce: waitingForCompletion");
/*
* Start the MapReduce job and wait for it to finish.
* If it finishes successfully, return 0. If not, return 1.
*/
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}

}

最佳答案

解决方案非常简单(在 Amazon 的客户支持告诉我之后):我必须升级到具有最新 Hadoop (2.4) 的最新 AMI(当前为 3.1.0),并确保我使用相同的 hadoop用于编译 Java 代码的版本。自从没见过这种问题。

关于Hadoop 无法从 s3 复制输入 bz2 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23257043/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com