我是hadoop的新手,我正在尝试实现一个算法,该算法只计算长度子串的发生次数 x
. 它很长但很简单。
下面是一个输入的实际示例: "ABCABCAGD" x=4, m=2
Map
长度子串的提取 x
(我们称之为x-string):
ABCA,BCAB,CABC,ABCA,BCAG,CAGD`
对于每个x字符串,我提取它的“签名”,定义为长度的字典次要子串 m
:
AB, AB, AB, AB, AG, AG
现在为每个“签名”生成另一个字符串,如下所示:
我连接具有相同签名和连续的x字符串。在这个例子中有两个签名 AB
, CB
. 属于这两个签名的x字符串是连续的,因此我的map任务的输出是:
Key=AB; Value=ABCABCA
Key=AG; Value=BCAGD
(如您所见,对于2个连续的x字符串,我只附加最后一个字符,第一个值是 ABCA + B + C + A
)
合路器
现在我再次从map输出中提取x字符串,我的combiner输出是:
Key=ABCA,Value=1
Key=BCAB,Value=1
Key=CABC,Value=1
Key=ABCA,Value=1
(属于fist map输出-> Key=AB; Value=ABCABCA
)
Key=BCAG,Value=1
Key=CAGD,Value=1
(属于第二个Map输出-> Key=AG; Value=BCAGD
)
减速机
现在我应该简单地计算每个x字符串的出现次数(是的,算法就是这么做的)
这应该是减少输出:
ABCA:2
BCAB:1
CABC:1
BCAG:1
CAGD:1
问题是输出是:
ABCA:1
ABCA:1
BCAB:1
CABC:1
BCAG:1
CAGD:1
我的reducer目前非常类似于wordcount,它只是对值进行迭代和求和。我很确定reduce任务(我把mr任务设置为 setNumReduceTasks(1)
)不知何故给出了错误的输出,因为它没有把所有的数据放在一起。
你觉得这个结构怎么样?
我选择在combiner步骤中提取x字符串,这是正确的位置还是我遇到的问题的一部分?
请注意:由于我的算法,合并器有更多的输出记录比输入..这是一个问题吗?
代码(从非hadoop逻辑简化)
public class StringReader extends Mapper<NullWritable, RecordInterface, LongWritable, BytesWritable> {
public void map(NullWritable key, RecordInterface value, Context context) throws IOException, InterruptedException {
HadoopRun.util.extractSuperKmersNew(value.getValue().getBytes(), context);
}
}
public void extractSuperKmersNew(byte[] r1, Mapper<NullWritable, RecordInterface, LongWritable, BytesWritable>.Context context) {
....
context.write(new LongWritable(current_signature),new BytesWritable(super_kmer));
....
}
public class Combiner extends Reducer<LongWritable, BytesWritable, LongWritable, BytesWritable> {
protected void reduce(LongWritable arg0, Iterable<BytesWritable> arg1,
Reducer<LongWritable, BytesWritable, LongWritable, BytesWritable>.Context context)
throws IOException, InterruptedException {
for (BytesWritable val : arg1) {
extractKmers(val.get(), context)
}
}
}
public void extractKmers(byte[] superkmer, Reducer<LongWritable, BytesWritable, LongWritable, BytesWritable>.Context arg2) {
int end = superkmer.length - k + 1;
//Extraction of k-string from the aggregated strings
for (int i = 0; i < end; i++) {
long l = byteArrayToLong(superkmer, i);
try {
// quickfix to send to reducer Key = K-string, Value=1
byte[] ONE = new byte[1];
ONE[0] = 1;
arg2.write(new LongWritable(l), new BytesWritable(ONE));
} catch (IOException | InterruptedException e) {
}
}
}
public class CounterReducer extends Reducer<LongWritable, BytesWritable, Text, IntWritable> {
protected void reduce(LongWritable kmer, Iterable<BytesWritable> count,
Reducer<LongWritable, BytesWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
int sum=0;
for (BytesWritable val : count) {
sum +=1
}
context.write(new Text(LongWritableToText(key), new IntWritable(sum));
}
}
public class HadoopRun extends Configured implements Tool {
public static Utility util;
public int run(String[] args) throws Exception {
/* HADOOP START */
Configuration conf = this.getConf();
Job job = Job.getInstance(conf, "Mapping Strings");
job.setJarByClass(HadoopRun.class);
job.setMapperClass(StringReader.class);
job.setCombinerClass(Combiner.class);
job.setReducerClass(CounterReducer.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(BytesWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setPartitionerClass(KPartitioner.class);
job.setNumReduceTasks(1);
job.setInputFormatClass(FASTAShortInputFileFormat.class);
FASTAShortInputFileFormat.addInputPath(job, new Path(conf.get("file_in")));
FileOutputFormat.setOutputPath(job, new Path(conf.get("file_out")));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
//...
//managing input arguments
//...
CommandLineParser parser = new BasicParser();
HelpFormatter formatter = new HelpFormatter();
try {
cmd = parser.parse(options, args);
} catch (ParseException e) {
formatter.printHelp("usage:", options);
System.exit(1);
}
Integer k = Integer.parseInt(cmd.getOptionValue("k"));
Integer m = Integer.parseInt(cmd.getOptionValue("m"));
String file_in_string = cmd.getOptionValue("file_in");
String file_out_string = cmd.getOptionValue("file_out");
Configuration conf = new Configuration();
conf.set("file_in", file_in_string);
conf.set("file_out", file_out_string);
util = new Utility(k, m);
int res = ToolRunner.run(conf, new HadoopRun(), args);
System.exit(res);
}
}
1条答案
按热度按时间r6hnlfcb1#
如果你想要一个计算任何长度为x的子串发生次数的方法
该方法返回一个哈希Map,它看起来有点像您在问题中使用的格式,其中每个键是一个子字符串,每个值是该子字符串的出现次数。