我在学习 Hadoop
这个问题让我困惑了一段时间。基本上我在写一本书 SequenceFile
到磁盘,然后读回。然而,每次我得到一个 EOFException
阅读时。深入研究会发现,在写入序列文件时,它会被过早地截断,而且总是在写入索引962之后发生,并且文件的固定大小总是为45056字节。
我正在MacBookPro上使用Java8和Hadoop2.5.1。事实上,我在Java7下的另一台linux机器上尝试了相同的代码,但同样的情况也发生了。
我可以排除书写器/读取器未正确关闭的可能性。我尝试使用旧样式的try/catch和显式writer.close(),如代码所示,还使用了较新的try-with-resource方法。两者都不起作用。
任何帮助都将不胜感激。
下面是我使用的代码:
public class SequenceFileDemo {
private static final String[] DATA = { "One, two, buckle my shoe",
"Three, four, shut the door",
"Five, six, pick up sticks",
"Seven, eight, lay them straight",
"Nine, ten, a big fat hen" };
public static void main(String[] args) throws Exception {
String uri = "file:///Users/andy/Downloads/puzzling.seq";
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path path = new Path(uri);
IntWritable key = new IntWritable();
Text value = new Text();
//API change
try {
SequenceFile.Writer writer = SequenceFile.createWriter(conf,
stream(fs.create(path)),
keyClass(IntWritable.class),
valueClass(Text.class));
for ( int i = 0; i < 1024; i++ ) {
key.set( i);
value.clear();
value.set(DATA[i % DATA.length]);
writer.append(key, value);
if ( (i-1) %100 == 0 ) writer.hflush();
System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
}
writer.close();
} catch (Exception e ) {
e.printStackTrace();
}
try {
SequenceFile.Reader reader = new SequenceFile.Reader(conf,
SequenceFile.Reader.file(path));
Class<?> keyClass = reader.getKeyClass();
Class<?> valueClass = reader.getValueClass();
boolean isWritableSerilization = false;
try {
keyClass.asSubclass(WritableComparable.class);
isWritableSerilization = true;
} catch (ClassCastException e) {
}
if ( isWritableSerilization ) {
WritableComparable<?> rKey = (WritableComparable<?>) ReflectionUtils.newInstance(keyClass, conf);
Writable rValue = (Writable) ReflectionUtils.newInstance(valueClass, conf);
while(reader.next(rKey, rValue)) {
System.out.printf("[%s] %d %s=%s\n",reader.syncSeen(), reader.getPosition(), rKey, rValue);
}
} else {
//make sure io.seraizliatons has the serialization in use when write the sequence file
}
reader.close();
} catch(IOException e) {
e.printStackTrace();
}
}
}
3条答案
按热度按时间yiytaume1#
我认为您缺少writer.close()在write循环之后。在你开始阅读之前,这应该是最后的冲水。
ohfgkhjo2#
多亏了托马斯。
归根结底,如果作者创造了“拥有”的不是流。在创建writer时,如果传入writer.file(path)选项,writer“拥有”内部创建的底层流,并在调用close()时将其关闭。但是,如果我们传入writer.stream(astream),writer会假定其他人是该流的响应,并且在调用close()时不会关闭它。简言之,这不是一个错误,只是我不太了解它。
wtzytmuj3#
实际上,我发现了这个错误,这是因为您从未关闭中创建的流
Writer.stream(fs.create(path))
.由于某些原因,关闭不会传播到您刚才在那里创建的流。我想这是一个bug,但我现在懒得在jira中查找它。
解决问题的一个方法就是简单地使用
Writer.file(path)
相反。显然,您也可以显式地关闭create流。下面是我的更正示例: