字符串转换的hadoop mapreduce示例

f3temu5u  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(395)

我在一些文本文件中有大量的字符串,需要通过这样的算法来转换这些字符串:将字符串转换为小写并删除所有空格。
你能给我举一个hadoop mapreduce函数的例子来实现这个算法吗?
谢谢您。

y3bcpkx1

y3bcpkx11#

在我玩map reduce的时候,我有一个类似的想法,那就是一定有一些实践或技巧,通过这些实践或技巧,我们可以修改记录中的每个单词,并完成所有的清理工作。
当我们回顾map reduce的整个算法时,我们有一个map函数,它在定界符的帮助下将传入的记录拆分为令牌(也许您会对它们有更好的了解)。现在,让我们试着用一种描述性的方式来处理你给出的问题陈述。
以下是我刚接触map reduce时会尝试做的事情:

> I will probably write a map() method which will split the lines for me
> I will possibly run out of options and write a reduce function
 and somehow will be able to achieve my objective

上面的练习是完全可以的,但是有一个更好的技巧可以帮助你决定是否需要reduce函数,因此你将有更多的选择来让你思考,完全专注于实现你的目标,同时也考虑优化你的代码。
在这种情况下,你的问题陈述陷入其中,一个班级来拯救我: ChainMapper 现在,链Map器将如何工作?以下是需要考虑的几点
->第一个Map器将从hdfs读取文件,按照分隔符分割每一行,并将令牌存储在上下文中。
->第二个Map器将从第一个Map器获得输出,在这里您可以根据业务需要执行各种与字符串相关的操作,如加密文本或更改为大写或小写等。
->作为第二个Map器结果的操作字符串应再次存储到上下文中
->现在,如果您需要一个reducer来完成诸如wordcount之类的聚合任务,那就去做吧。
我有一段代码可能效率不高(或者有些人可能觉得很糟糕),但它符合您的目的,因为您可能在玩mapreduce。
拆分Map器.java

public class SplitMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
    @Override
    public void map(Object key,Text value,Context context)
                                    throws IOException,InterruptedException{
        StringTokenizer xs=new StringTokenizer(value.toString());
        IntWritable dummyValue=new IntWritable(1);
        while(xs.hasMoreElements()){
            String content=(String)xs.nextElement();
            context.write(new Text(content),dummyValue);
        }
    }
}

小写Map.java

public class LowerCaseMapper extends Mapper<Text,IntWritable,Text,IntWritable>{
    @Override
    public void map(Text key,IntWritable value,Context context) 
                                        throws IOException,InterruptedException{
        String val=key.toString().toLowerCase();
        Text newKey=new Text(val);
        Context.write(newKey,value);    
    }
}

因为我在这里执行字数统计,所以我需要一个减速机
chainmapreducer.java文件

public class ChainMapReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
    @Override
    public void reduce(Text key,Iterable<IntWritable> value,Context context)
                                throws IOException,InterruptedException{
        int sum=0;
        for(IntWritable v:value){
            sum+=value.get();
        }
        context.write(key,new IntWritables(sum));
    }
}

为了能够成功地实现chainmapper的概念,您必须注意驱动程序类的每一个细节
驱动程序类.java

public class DriverClass extends Configured implements Tool{
    static Configuration cf;
    public int run(String args[]) throws IOException,InterruptedException,ClassNotFoundException{
        cf=new Configuration();
        Job j=Job.getInstance(cf);
        //configuration for the first mapper
        Configuration.splitMapConfig=new Configuration(false);
        ChainMapper.addMapper(j,SplitMapper.class,Object.class,Text.class,Text.class,IntWritable.class,splitMapConfig);
        //configuration for the second mapper
        Configuration.lowerCaseConfig=new Configuration(false);
        ChainMapper.addMapper(j,LowerCaseMapper.class,Text.class,IntWritable.class,Text.class,IntWritable.class,lowerCaseConfig);

        j.setJarByClass(DriverClass.class);
        j.setCombinerClass(ChainMapReducer.class);
        j.setOutputKeyClass(Text.class);
        j.setOutputValueClass(IntWritable.class);

        Path outputPath=new Path(args[1]);
        FileInputFormat.addInputPath(j,new Path(args[0]));
        FileOutputFormat.setOutputPath(j,outputPath);
        outputPath.getFileSystem(cf).delete(outputPath,true);
    }
    public static void main(String args[]) throws Exception{
        int res=ToolRunner.run(cf,new DriverClass(),args);
        System.exit(1);
    }
}

driver类非常容易理解,只需要观察 ChainMapper.add(<job-object>,<Map-ClassName>,<Input arguments types>,<configuration-for-the-concerned-mapper>) 我希望解决方案符合您的目的,请让我知道,如果任何问题,可能会出现在您尝试实施。
谢谢你!

r3i60tvu

r3i60tvu2#

我尝试了下面的代码并在一行中获得了输出。
公务舱司机{

public static class textMapper extends Mapper<LongWritable,Text,NullWritable,Text>
{
    Text outvalue=new Text();

    public void map(LongWritable key,Text values,Context context) throws IOException, InterruptedException
    {
        String token;
        StringBuffer br=new StringBuffer();
        StringTokenizer st=new StringTokenizer(values.toString());
        while(st.hasMoreTokens())
        {
            token=st.nextToken();
            br.append(token.toUpperCase()); 
        }
        st=null;
        outvalue.set(br.toString());
        context.write(NullWritable.get(), outvalue);
        br=null;

    }
}
public static class textReduce extends Reducer<NullWritable,Text,NullWritable,Text>
{
    Text outvale=new Text();
    public void reduce(NullWritable key,Iterable<Text> values,Context context) throws IOException, InterruptedException
    {
        StringBuffer br=new StringBuffer();
        for(Text st:values)
        {
            br.append(st.toString());
        }
        outvale.set(br.toString());
        context.write(NullWritable.get(), outvale);
    }
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    Configuration conf=new Configuration();
    @SuppressWarnings("deprecation")
    Job job=new Job(conf,"touipprr");

    job.setJarByClass(toUpper.class);
    job.setMapperClass(textMapper.class);
    job.setReducerClass(textReduce.class);

    job.setOutputKeyClass(NullWritable.class);
    job.setOutputValueClass(Text.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    System.exit(job.waitForCompletion(true)?1:0);

}

}

相关问题