accumulo mapreduce作业因java.io.eofexception而失败,使用accumulorowinputformat

q9yhzks0  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(394)

我所有的Map器都失败了,只有下面的例外。我只是在简洁方面表现出最后的失败。
为什么会发生这种情况?我该如何解决?

16/09/21 17:01:57 INFO mapred.JobClient: Task Id : attempt_201609151451_0044_m_000002_2, Status : FAILED
java.io.EOFException
    at java.io.DataInputStream.readFully(DataInputStream.java:197)
    at java.io.DataInputStream.readUTF(DataInputStream.java:609)
    at java.io.DataInputStream.readUTF(DataInputStream.java:564)
    at org.apache.accumulo.core.client.mapreduce.RangeInputSplit.readFields(RangeInputSplit.java:154)
    at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:71)
    at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:42)
    at org.apache.hadoop.mapred.MapTask.getSplitDetails(MapTask.java:356)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:640)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
    at org.ap
16/09/21 17:02:00 INFO mapred.JobClient: Job complete: job_201609151451_0044
16/09/21 17:02:00 INFO mapred.JobClient: Counters: 8
16/09/21 17:02:00 INFO mapred.JobClient:   Job Counters
16/09/21 17:02:00 INFO mapred.JobClient:     Failed map tasks=1
16/09/21 17:02:00 INFO mapred.JobClient:     Launched map tasks=48
16/09/21 17:02:00 INFO mapred.JobClient:     Data-local map tasks=13
16/09/21 17:02:00 INFO mapred.JobClient:     Rack-local map tasks=35
16/09/21 17:02:00 INFO mapred.JobClient:     Total time spent by all maps in occupied slots (ms)=343982
16/09/21 17:02:00 INFO mapred.JobClient:     Total time spent by all reduces in occupied slots (ms)=0
16/09/21 17:02:00 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
16/09/21 17:02:00 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0

我使用accumulo表作为输入数据。我的设置如下:

@Override
public int run(String[] args) throws Exception {
    Configuration conf = getConf();
    String idMapFileContent = readResourceFile(TYPE_ID_MAP_FILENAME);
    conf.set(TYPE_ID_MAP_KEY, idMapFileContent);

    Job job = Job.getInstance(conf, this.getClass().getSimpleName());
    job.setJarByClass(this.getClass());
    job.setMapperClass(DanglingLinksFinderMapper.class);
    job.setReducerClass(DanglingLinksFinderReducer.class);
    this.setupRowInputFormat(job);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    Path out = new Path(args[0]);
    LOGGER.info("Writing to output directory: " + out.toUri());
    FileOutputFormat.setOutputPath(job, out);

    int exitCode = job.waitForCompletion(true) ? 0 : 1;
}

private Job setupRowInputFormat(Job job)
        throws IOException, AccumuloSecurityException
{
    job.setInputFormatClass(AccumuloRowInputFormat.class);
    Configuration conf = job.getConfiguration();

    AccumuloConnectInfo connectInfo = new AccumuloConnectInfo(conf);
    LOGGER.info(connectInfo.toString());

    AccumuloRowInputFormat.setZooKeeperInstance(job, connectInfo.getInstanceNames(), connectInfo.getZookeeperInstanceNames());
    AccumuloRowInputFormat.setConnectorInfo(job, connectInfo.getUserName(), connectInfo.getPassword());
    AccumuloRowInputFormat.setScanAuthorizations(job, new Authorizations());
    AccumuloRowInputFormat.setInputTableName(job, TABLE_NAME);
    return job;
}

我使用的是Hadoop2.6.0、Accumulo1.5.0和Java1.7。
前几天我做了这个,据我所知,没有任何改变。所以我在想,它可能与我运行它的服务器上的配置或数据状态有关?该作业在本地计算机上docker容器中运行的测试表上运行正常,但在远程测试服务器上失败。
我可以登录 accumulo shell 扫描我的table。那里一切看起来都很好。我还尝试在测试服务器上运行压缩,虽然效果很好,但没有解决问题。

e5nszbig

e5nszbig1#

我猜,您用于启动mapreduce作业的acumulo jar与通过distributedcache或libjars cli选项用于作业本身的acumulo jar(Map器/还原器)的版本不匹配。
由于未指定任何范围,AccuMuloinInputFormat将自动获取表的所有数字化仪边界,并创建与表中数字化仪数量相同的rangeinputsplit对象。这种分割创建是在本地jvm(提交作业时创建的jvm)中完成的。这些rangeinputsplit对象被序列化并传递到yarn中。
您提供的错误是当Map程序获取其中一个序列化的rangeinputsplit对象并尝试对其进行反序列化时。有些情况下,这是失败的,因为没有足够的序列化数据来反序列化Map程序中运行的accumulo版本期望读取的内容。
可能这只是accumulo版本中的序列化错误(请分享),但我不记得听说过这样的错误。我猜accumulo在本地类路径和Map器的类路径上的版本是不同的。

相关问题