分布式子串计数

u5rb5r59  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(338)

我是hadoop的新手,我正在尝试实现一个算法,该算法只计算长度子串的发生次数 x . 它很长但很简单。
下面是一个输入的实际示例: "ABCABCAGD" x=4, m=2 Map
长度子串的提取 x (我们称之为x-string):

ABCA,BCAB,CABC,ABCA,BCAG,CAGD`

对于每个x字符串,我提取它的“签名”,定义为长度的字典次要子串 m :

AB, AB, AB, AB, AG, AG

现在为每个“签名”生成另一个字符串,如下所示:
我连接具有相同签名和连续的x字符串。在这个例子中有两个签名 AB , CB . 属于这两个签名的x字符串是连续的,因此我的map任务的输出是:

Key=AB; Value=ABCABCA
Key=AG; Value=BCAGD

(如您所见,对于2个连续的x字符串,我只附加最后一个字符,第一个值是 ABCA + B + C + A )
合路器
现在我再次从map输出中提取x字符串,我的combiner输出是:

Key=ABCA,Value=1
Key=BCAB,Value=1
Key=CABC,Value=1
Key=ABCA,Value=1

(属于fist map输出-> Key=AB; Value=ABCABCA )

Key=BCAG,Value=1
Key=CAGD,Value=1

(属于第二个Map输出-> Key=AG; Value=BCAGD )
减速机
现在我应该简单地计算每个x字符串的出现次数(是的,算法就是这么做的)
这应该是减少输出:

ABCA:2
BCAB:1
CABC:1
BCAG:1
CAGD:1

问题是输出是:

ABCA:1
ABCA:1
BCAB:1
CABC:1
BCAG:1
CAGD:1

我的reducer目前非常类似于wordcount,它只是对值进行迭代和求和。我很确定reduce任务(我把mr任务设置为 setNumReduceTasks(1) )不知何故给出了错误的输出,因为它没有把所有的数据放在一起。
你觉得这个结构怎么样?
我选择在combiner步骤中提取x字符串,这是正确的位置还是我遇到的问题的一部分?
请注意:由于我的算法,合并器有更多的输出记录比输入..这是一个问题吗?
代码(从非hadoop逻辑简化)

public class StringReader extends Mapper<NullWritable, RecordInterface, LongWritable, BytesWritable> {
    public void map(NullWritable key, RecordInterface value, Context context) throws IOException, InterruptedException {
            HadoopRun.util.extractSuperKmersNew(value.getValue().getBytes(), context);
    }
}

public void extractSuperKmersNew(byte[] r1, Mapper<NullWritable, RecordInterface, LongWritable, BytesWritable>.Context context) {

....
    context.write(new LongWritable(current_signature),new BytesWritable(super_kmer));
....
}

public class Combiner extends Reducer<LongWritable, BytesWritable, LongWritable, BytesWritable> {

    protected void reduce(LongWritable arg0, Iterable<BytesWritable> arg1,
            Reducer<LongWritable, BytesWritable, LongWritable, BytesWritable>.Context context)
            throws IOException, InterruptedException {
        for (BytesWritable val : arg1) {
            extractKmers(val.get(), context)            

        }
    }
}

public void extractKmers(byte[] superkmer, Reducer<LongWritable, BytesWritable, LongWritable, BytesWritable>.Context arg2) {    

int end = superkmer.length - k + 1;

//Extraction of k-string from the aggregated strings
for (int i = 0; i < end; i++) {
    long l = byteArrayToLong(superkmer, i);
    try {
    // quickfix to send to reducer Key = K-string, Value=1
    byte[] ONE = new byte[1];
    ONE[0] = 1;
        arg2.write(new LongWritable(l), new BytesWritable(ONE));
    } catch (IOException | InterruptedException e) {

    }
}
}

public class CounterReducer extends Reducer<LongWritable, BytesWritable, Text, IntWritable> {

    protected void reduce(LongWritable kmer, Iterable<BytesWritable> count,
            Reducer<LongWritable, BytesWritable, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {

 int sum=0;
 for (BytesWritable val : count) {
        sum +=1
      }
      context.write(new Text(LongWritableToText(key), new IntWritable(sum));

    }
}

public class HadoopRun extends Configured implements Tool {

    public static Utility util;

    public int run(String[] args) throws Exception {
        /* HADOOP START */
        Configuration conf = this.getConf();
        Job job = Job.getInstance(conf, "Mapping Strings");
        job.setJarByClass(HadoopRun.class);
        job.setMapperClass(StringReader.class);
        job.setCombinerClass(Combiner.class);
        job.setReducerClass(CounterReducer.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(BytesWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setOutputFormatClass(TextOutputFormat.class); 
        job.setPartitionerClass(KPartitioner.class); 

        job.setNumReduceTasks(1);
        job.setInputFormatClass(FASTAShortInputFileFormat.class);
        FASTAShortInputFileFormat.addInputPath(job, new Path(conf.get("file_in")));
        FileOutputFormat.setOutputPath(job, new Path(conf.get("file_out")));
        return job.waitForCompletion(true) ? 0 : 1;
    }

public static void main(String[] args) throws Exception {

        //...
        //managing input arguments
        //...

        CommandLineParser parser = new BasicParser();
        HelpFormatter formatter = new HelpFormatter();
        try {
            cmd = parser.parse(options, args);
        } catch (ParseException e) {
            formatter.printHelp("usage:", options);
            System.exit(1);
        }
        Integer k = Integer.parseInt(cmd.getOptionValue("k"));
        Integer m = Integer.parseInt(cmd.getOptionValue("m"));
        String file_in_string = cmd.getOptionValue("file_in");
        String file_out_string = cmd.getOptionValue("file_out");
        Configuration conf = new Configuration();
        conf.set("file_in", file_in_string);
        conf.set("file_out", file_out_string);
        util = new Utility(k, m);
        int res = ToolRunner.run(conf, new HadoopRun(), args);

        System.exit(res);
    }

}
r6hnlfcb

r6hnlfcb1#

如果你想要一个计算任何长度为x的子串发生次数的方法

/**
     * 
     * @param s string to get substrings from 
     * @param x length of substring you want
     * @return hashmap with each substring being the keys and amount of occurrences being the values 
     */
    public static HashMap countSub(String s,int x){
        HashMap<String,Integer> hm = new HashMap();
        int to = s.length()-x+1;
        hm.put(s.substring(0,x),1);

        for(int i=1;i<to;i++){
            x++;
            String next = s.substring(i,x);
            boolean b = false;

            for (String key : hm.keySet()) {
                b = key.equals(next);
                //if key already exists increment value
                if(b) {
                    hm.put(key,hm.get(key)+1);
                    break;
                }
            }
            //else make new key
            if(!b) hm.put(next,1);
        }
        return hm;
    }

该方法返回一个哈希Map,它看起来有点像您在问题中使用的格式,其中每个键是一个子字符串,每个值是该子字符串的出现次数。

相关问题