我想使用一个hashset,它在一个文件被Map时存在/工作,然后在下一个文件被Map时被重置/重新创建。我修改了textinputformat以覆盖issplitable以返回false,这样文件就不会被分割,而是由Map程序作为一个整体进行处理。有可能这样做吗?或者有没有其他方法可以减少对accumulo表的写入?
让我从我不相信我想要一个全局变量开始。我只想确保唯一性,从而在accumulo表中写入更少的突变。
我的项目是将shard示例中index.java文件的功能从线性accumulo客户机程序转换为使用mapreduce功能的程序,同时仍然在accumulo中创建相同的表。它需要mapreduce,因为这是一个时髦的词,而且本质上它比针对TB数据的线性程序运行得更快。
以下是索引代码供参考:http://grepcode.com/file/repo1.maven.org/maven2/org.apache.accumulo/examples-simple/1.4.0/org/apache/accumulo/examples/simple/shard/index.java
该程序使用batchwriter将突变写入accumulo,并按文件执行。为了确保它不会写太多的突变,并确保唯一性(尽管我相信accumulo最终会通过压缩合并相同的键),index.java有一个哈希集,用于确定一个单词以前是否遇到过。这都是相对简单的理解。
移动到仅Map的mapreduce作业更为复杂。
这是我在Map方面的尝试,从我看到的accumulo表的部分输出来看,这似乎是一种工作,但是与线性程序index.java相比,运行速度非常慢
public static class MapClass extends Mapper<LongWritable,Text,Text,Mutation> {
private HashSet<String> tokensSeen = new HashSet<String>();
@Override
public void map(LongWritable key, Text value, Context output) throws IOException {
FileSplit fileSplit = (FileSplit)output.getInputSplit();
System.out.println("FilePath " + fileSplit.getPath().toString());
String filePath = fileSplit.getPath().toString();
filePath = filePath.replace("unprocessed", "processed");
String[] words = value.toString().split("\\W+");
for (String word : words) {
Mutation mutation = new Mutation(genPartition(filePath.hashCode() % 10));
word = word.toLowerCase();
if(!tokensSeen.contains(word)) {
tokensSeen.add(word);
mutation.put(new Text(word), new Text(filePath), new Value(new byte[0]));
}
try {
output.write(null, mutation);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
慢的问题可能是我在一个测试示例上运行所有这些,一个hadoop的单节点示例,上面有zookeeper和acumulo。如果是这样的话,我只需要找到唯一性的解决方案。
我们非常感谢您提供的任何帮助或建议。
1条答案
按热度按时间8wtpewkr1#
Map程序
setup
以及cleanup
可以重写的方法来更干净地处理这类事情。setup
那就叫一次map
多次调用(每个记录调用一次),然后cleanup
最后被调用一次。我们的想法是在setup
方法,在map
,并在cleanup
,或定期刷新map
如有必要。然而,在迁移到真正的集群之前,几乎肯定不会看到运行时的任何改进。与简单的线性程序相比,单节点测试示例几乎没有任何好处,只不过,一旦获得真正的hadoop集群,同样的代码将运行得更快。