我构建了一个基本的web解析器,它使用hadoop将url传递给多个线程。在我到达输入文件的末尾之前,hadoop会在仍有线程运行的情况下声明自己已经完成了。这将导致错误org.apache.hadoop.fs.fserror:java.io.ioexception:stream closed。不管怎样,是否有足够长的时间来保持流的开放以便线程结束(我可以合理准确地预测线程在单个url上花费的最大时间)。
下面是我如何执行线程
public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, Text, Text> {
private Text word = new Text();
private URLPile pile = new URLPile();
private MSLiteThread[] Threads = new MSLiteThread[16];
private boolean once = true;
@Override
public void map(LongWritable key, Text value,
OutputCollector<Text, Text> output, Reporter reporter) {
String url = value.toString();
StringTokenizer urls = new StringTokenizer(url);
Config.LoggerProvider = LoggerProvider.DISABLED;
System.out.println("In Mapper");
if (once) {
for (MSLiteThread thread : Threads) {
System.out.println("created thread");
thread = new MSLiteThread(pile);
thread.start();
}
once = false;
}
while (urls.hasMoreTokens()) {
try {
word.set(urls.nextToken());
String currenturl = word.toString();
pile.addUrl(currenturl, output);
} catch (Exception e) {
e.printStackTrace();
continue;
}
}
}
线程本身得到如下url
public void run(){
try {
sleep(3000);
while(!done()){
try {
System.out.println("in thread");
MSLiteURL tempURL = pile.getNextURL();
String currenturl = tempURL.getURL();
urlParser.parse(currenturl);
urlText.set("");
titleText.set(currenturl+urlParser.export());
System.out.println(urlText.toString()+titleText.toString());
tempURL.getOutput().collect(urlText, titleText);
pile.doneParsing();
sleep(30);
} catch (Exception e) {
pile.doneParsing();
e.printStackTrace();
continue;
}
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("Thread done");
}
并介绍了urlpile中的相关方法
public synchronized void addUrl(String url,OutputCollector<Text, Text> output) throws InterruptedException {
while(queue.size()>16){
System.out.println("queue full");
wait();
}
finishedParcing--;
queue.add(new MSLiteURL(output,url));
notifyAll();
}
private Queue<MSLiteURL> queue = new LinkedList<MSLiteURL>();
private int sent = 0;
private int finishedParcing = 0;
public synchronized MSLiteURL getNextURL() throws InterruptedException {
notifyAll();
sent++;
//System.out.println(queue.peek());
return queue.remove();
}
1条答案
按热度按时间qnyhuwrf1#
正如我从下面的注解中推断出的那样,您可以在map()函数的每个部分中这样做,以使事情变得简单。我看到您执行了以下操作,以预创建一些空闲线程。您可以将以下代码移动到
到,
所以,只要初始化一次,就不再需要“once”条件检查了。
此外,不需要像上面那样生成空闲线程。我不知道创建16个空闲线程可以获得多少性能提升。
不管怎样,这里有一个解决方案(虽然可能并不完美)
你可以使用倒计时锁来读取更多信息,以批量处理你的url,直到它们完成为止。这是因为,如果您将每个传入的url记录释放到一个线程,那么将立即获取下一个url,并且当您以相同的方式处理最后一个url时,map()函数将返回,即使您在队列中还有线程要处理。你将不可避免地得到你提到的例外。
下面是一个示例,说明如何使用倒计时闩锁进行阻止。
最后,在urlprocessingthread中,一旦url被处理,就减少闩锁计数器,
你的代码可能有问题:在
pile.addUrl(currenturl, output);
,当您添加一个新的url时,同时所有16个线程都将得到更新(我不太确定),因为相同的pile对象被传递给16个线程。有一个机会,你的网址得到重新处理,或者你可能会得到一些其他的副作用(我不是很确定)。其他建议:
另外,您可能需要使用
mapred.task.timeout
(默认值为600000毫秒)=10分钟
描述:如果任务既不读取输入,也不写入输出,也不更新其状态字符串,则任务终止前的毫秒数。
可以在mapred-site.xml中添加/重写此属性