hadoop无法从s3复制输入bz2文件

zbwhf8kr  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(373)

我有一个只支持map的hadoop作业,运行在amazon的emr上,运行在最新的ami版本3.0.4上。偶尔会有这样的例外:

Error: com.amazonaws.AmazonClientException: Unable to verify integrity of data download.  Client calculated content length didn't match content length received from Amazon S3.  The
data may be corrupt.
    at com.amazonaws.util.ContentLengthValidationInputStream.validate(ContentLengthValidationInputStream.java:144)
    at com.amazonaws.util.ContentLengthValidationInputStream.read(ContentLengthValidationInputStream.java:81)
    at java.io.FilterInputStream.read(FilterInputStream.java:133)
    at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.read(EmrFileSystem.java:289)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
    at java.io.BufferedInputStream.read1(BufferedInputStream.java:275)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
    at java.io.DataInputStream.read(DataInputStream.java:149)
    at java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:254)
    at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.readAByte(CBZip2InputStream.java:195)
    at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:866)
    at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:504)
    at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
    at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:423)
    at org.apache.hadoop.io.compress.BZip2Codec.read(BZip2Codec.java:483)
    at java.io.InputStream.read(InputStream.java:101)

    at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:211)
    at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
    at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:164)
    at org.apache.hadoop.mapred.MapTask.nextKeyValue(MapTask.java:544)
    at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80)
    at org.apache.hadoop.mapreduce.lib.map.WrappedMapper.nextKeyValue(WrappedMapper.java:91)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:775)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
    at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:162)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)

有什么办法能治好吗?为什么会这样?是亚马逊的网络问题吗?输入文件不会有问题,因为重新运行同一个作业通常会成功。有没有办法捕捉到这个异常?为什么hadoop不能自动修复它?
我的主要课程是这样的:

public class LogParserMapReduce extends Configured implements Tool {
    private static final Log LOG = LogFactory.getLog(LogParserMapReduce.class);

  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = super.getConf();

    conf.setBoolean("mapred.compress.map.output", true);
    conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);
    conf.setBoolean("keep.failed.task.files", true);

    /*
     * Instantiate a Job object for your job's configuration.  
     */
    Job job = Job.getInstance(conf);

    /*
     * The expected command-line arguments are the paths containing
     * input and output data. Terminate the job if the number of
     * command-line arguments is not exactly 2.
     */
    if (args.length != 2) {
      System.out.printf("Usage: LogParserMapReduce <input dir> <output dir>\n");
      System.exit(-1);
    }

    /*
     * Specify the jar file that contains your driver, mapper, and reducer.
     * Hadoop will transfer this jar file to nodes in your cluster running
     * mapper and reducer tasks.
     */
    job.setJarByClass(LogParserMapReduce.class);

    /*
     * Specify an easily-decipherable name for the job.
     * This job name will appear in reports and logs.
     */
    job.setJobName("LogParser");

    /*
     * Specify the paths to the input and output data based on the
     * command-line arguments.
     */
    FileInputFormat.addInputPaths(job, args[0]);
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    FileOutputFormat.setCompressOutput(job, true);
    FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);

    /*
     * Specify the mapper and reducer classes.
     */
    job.setMapperClass(LogParserMapper.class);

    /*
     * For the SysLogEvent count application, the input file and output 
     * files are in text format - the default format.
     * 
     * In text format files, each record is a line delineated by a 
     * by a line terminator.
     * 
     * When you use other input formats, you must call the 
     * SetInputFormatClass method. When you use other 
     * output formats, you must call the setOutputFormatClass method.
     */

    /*
     * For the logs count application, the mapper's output keys and
     * values have the same data types as the reducer's output keys 
     * and values: Text and IntWritable.
     * 
     * When they are not the same data types, you must call the 
     * setMapOutputKeyClass and setMapOutputValueClass 
     * methods.
     */

    /*
     * Specify the job's output key and value classes.
     */
    job.setOutputKeyClass(NullWritable.class);
    job.setOutputValueClass(Text.class);

    job.setNumReduceTasks(0);

    LOG.info("LogParserMapReduce: waitingForCompletion");
    /*
     * Start the MapReduce job and wait for it to finish.
     * If it finishes successfully, return 0. If not, return 1.
     */
    boolean success = job.waitForCompletion(true);
    return success ? 0 : 1;
  }

}
brgchamk

brgchamk1#

解决方案非常简单(在amazon的客户支持告诉我之后):我必须升级到最新的ami(目前是3.1.0),它拥有最新的hadoop(2.4),并且还要确保我在编译java代码时使用了相同的hadoop版本。从那以后我就没见过这种问题。

相关问题