hadoop上stackoverflow的posts.xml的java解析

9lowa7mx  于 2021-06-03  发布在  Hadoop
关注(0)|答案(2)|浏览(277)

我跟随anoop madhusudanan关于codeproject的这篇文章,不是在集群上而是在我的系统上构建一个推荐引擎。
问题是当我尝试解析posts.xml时,它的结构如下:

<row Id="99" PostTypeId="2" ParentId="88" CreationDate="2008-08-01T14:55:08.477" Score="2" Body="&lt;blockquote&gt;&#xD;&#xA;  &lt;p&gt;The actual resolution of gettimeofday() depends on the hardware architecture. Intel processors as well as SPARC machines offer high resolution timers that measure microseconds. Other hardware architectures fall back to the system’s timer, which is typically set to 100 Hz. In such cases, the time resolution will be less accurate. &lt;/p&gt;&#xD;&#xA;&lt;/blockquote&gt;&#xD;&#xA;&#xD;&#xA;&lt;p&gt;I obtained this answer from &lt;a href=&quot;http://www.informit.com/guides/content.aspx?g=cplusplus&amp;amp;seqNum=272&quot; rel=&quot;nofollow&quot;&gt;High Resolution Time Measurement and Timers, Part I&lt;/a&gt;&lt;/p&gt;" OwnerUserId="25" LastActivityDate="2008-08-01T14:55:08.477" />

现在我需要在hadoop上解析这个文件(大小1.4gb),我已经用java编写了代码并创建了它的jar。java类如下:

import java.io.IOException;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.DocumentBuilder;
import org.w3c.dom.Document;
import org.w3c.dom.NodeList;
import org.w3c.dom.Node;
import org.w3c.dom.Element;

import java.io.File;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;

public class Recommend {

    static class Map extends Mapper<Text, Text, Text, Text> {
        Path path;
        String fXmlFile;
        DocumentBuilderFactory dbFactory;
        DocumentBuilder dBuilder;
        Document doc;

        /**
         * Given an output filename, write a bunch of random records to it.
         */
        public void map(LongWritable key, Text value,
                OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
            try{
                fXmlFile=value.toString();
                dbFactory = DocumentBuilderFactory.newInstance();
                dBuilder= dbFactory.newDocumentBuilder();
                doc= dBuilder.parse(fXmlFile);

                doc.getDocumentElement().normalize();
                NodeList nList = doc.getElementsByTagName("row");

                for (int temp = 0; temp < nList.getLength(); temp++) {

                    Node nNode = nList.item(temp);
                    Element eElement = (Element) nNode;

                    Text keyWords =new Text(eElement.getAttribute("OwnerUserId"));
                    Text valueWords = new Text(eElement.getAttribute("ParentId"));
                    String val=keyWords.toString()+" "+valueWords.toString();
                    // Write the sentence 
                    if(keyWords != null && valueWords != null){
                        output.collect(keyWords, new Text(val));
                    }
                }

            }catch (Exception e) {
                e.printStackTrace();
            } 
        }
    }

    /**
     * 
     * @throws IOException 
     */
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        //String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        /*if (args.length != 2) {
          System.err.println("Usage: wordcount <in> <out>");
          System.exit(2);
        }*/
//      FileSystem fs = FileSystem.get(conf);
        Job job = new Job(conf, "Recommend");
        job.setJarByClass(Recommend.class);

        // the keys are words (strings)
        job.setOutputKeyClass(Text.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);

        // the values are counts (ints)
        job.setOutputValueClass(Text.class);

        job.setMapperClass(Map.class);
        //conf.setReducerClass(Reduce.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
         Path outPath = new Path(args[1]);
            FileSystem dfs = FileSystem.get(outPath.toUri(), conf);
            if (dfs.exists(outPath)) {
            dfs.delete(outPath, true);
            }
    }
}

我希望输出是hadoop中包含输出的文件 OwnerUserId ParentId 但我得到的结果是:

1599788   <row Id="2292" PostTypeId="2" ParentId="2284" CreationDate="2008-08-05T13:28:06.700" Score="0" ViewCount="0" Body="&lt;p&gt;The first thing you should do is contact the main people who run the open source project. Ask them if it is ok to contribute to the code and go from there.&lt;/p&gt;&#xD;&#xA;&#xD;&#xA;&lt;p&gt;Simply writing your improved code and then giving it to them may result in your code being rejected.&lt;/p&gt;" OwnerUserId="383" LastActivityDate="2008-08-05T13:28:06.700" />

我不知道这个词的来历 1599788 显示为Map器中的键值。
我不太了解如何为hadoop编写Map器类,我需要帮助修改代码以获得所需的输出。
提前谢谢。

gzjq41n4

gzjq41n41#

下面是我编写的Map程序,用于解析so posts xml并在hadoop上创建一个以tab分隔的文件,供其他map reduce作业、hive或pig使用。
制图器

package com.aravind.learning.hadoop.mapred.techtalks;

import java.io.IOException;
import java.io.StringReader;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.xml.sax.InputSource;
import org.xml.sax.SAXException;

import com.google.common.base.Joiner;

public class StackoverflowDataWranglerMapper extends Mapper<LongWritable, Text, Text, Text>
{
    static enum BadRecordCounters
    {
        NO_CREATION_DATE, UNKNOWN_USER_ID, UNPARSEABLE_RECORD, UNTAGGED_POSTS
    }

    private final Text outputKey = new Text();
    private final Text outputValue = new Text();

    private final DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
    private DocumentBuilder builder;
    private static final Joiner TAG_JOINER = Joiner.on(",").skipNulls();
    // 2008-07-31T21:42:52.667
    private static final DateFormat DATE_PARSER = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
    private static final SimpleDateFormat DATE_BUILDER = new SimpleDateFormat("yyyy-MM-dd");

    @Override
    protected void setup(Context context) throws IOException, InterruptedException
    {
        try
        {
            builder = factory.newDocumentBuilder();
        }
        catch (ParserConfigurationException e)
        {
            new IOException(e);
        }
    }

    @Override
    protected void map(LongWritable inputKey, Text inputValue, Mapper<LongWritable, Text, Text, Text>.Context context)
            throws IOException, InterruptedException
    {
        try
        {
            String entry = inputValue.toString();
            if (entry.contains("<row "))
            {
                Document doc = builder.parse(new InputSource(new StringReader(entry)));
                Element rootElem = doc.getDocumentElement();

                String id = rootElem.getAttribute("Id");
                String postedBy = rootElem.getAttribute("OwnerUserId").trim();
                String viewCount = rootElem.getAttribute("ViewCount");
                String postTypeId = rootElem.getAttribute("PostTypeId");
                String score = rootElem.getAttribute("Score");
                String title = rootElem.getAttribute("Title");
                String tags = rootElem.getAttribute("Tags");
                String answerCount = rootElem.getAttribute("AnswerCount");
                String commentCount = rootElem.getAttribute("CommentCount");
                String favoriteCount = rootElem.getAttribute("FavoriteCount");
                String creationDate = rootElem.getAttribute("CreationDate");

                Date parsedDate = null;
                if (creationDate != null && creationDate.trim().length() > 0)
                {
                    try
                    {
                        parsedDate = DATE_PARSER.parse(creationDate);
                    }
                    catch (ParseException e)
                    {
                        context.getCounter("Bad Record Counters", "Posts missing CreationDate").increment(1);
                    }
                }

                if (postedBy.length() == 0 || postedBy.trim().equals("-1"))
                {
                    context.getCounter("Bad Record Counters", "Posts with either empty UserId or UserId contains '-1'")
                            .increment(1);
                    try
                    {
                        parsedDate = DATE_BUILDER.parse("2100-00-01");
                    }
                    catch (ParseException e)
                    {
                        // ignore
                    }
                }

                tags = tags.trim();
                String tagTokens[] = null;

                if (tags.length() > 1)
                {
                    tagTokens = tags.substring(1, tags.length() - 1).split("><");
                }
                else
                {
                    context.getCounter("Bad Record Counters", "Untagged Posts").increment(1);
                }

                outputKey.clear();
                outputKey.set(id);

                StringBuilder sb = new StringBuilder(postedBy).append("\t").append(parsedDate.getTime()).append("\t")
                        .append(postTypeId).append("\t").append(title).append("\t").append(viewCount).append("\t").append(score)
                        .append("\t");

                if (tagTokens != null)
                {
                    sb.append(TAG_JOINER.join(tagTokens)).append("\t");
                }
                else
                {
                    sb.append("").append("\t");
                }
                sb.append(answerCount).append("\t").append(commentCount).append("\t").append(favoriteCount).toString();

                outputValue.set(sb.toString());

                context.write(outputKey, outputValue);
            }
        }
        catch (SAXException e)
        {
            context.getCounter("Bad Record Counters", "Unparsable records").increment(1);
        }
        finally
        {
            builder.reset();
        }
    }
}

司机

public class StackoverflowDataWranglerDriver extends Configured implements Tool
{
    @Override
    public int run(String[] args) throws Exception
    {
        if (args.length != 2)
        {
            System.err.printf("Usage: %s [generic options] <input> <output>\n", getClass().getSimpleName());
            ToolRunner.printGenericCommandUsage(System.err);
            return -1;
        }

        Job job = Job.getInstance(getConf());

        job.setJobName("Tech Talks - Stackoverflow Forum Posts - Data Wrangler");

        TextInputFormat.addInputPath(job, new Path(args[0]));

        TextOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setJarByClass(StackoverflowDataWranglerMapper.class);// required for mr1
        job.setMapperClass(StackoverflowDataWranglerMapper.class);
        job.setNumReduceTasks(0);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String args[]) throws Exception
    {
        int exitCode = ToolRunner.run(new Configuration(), new StackoverflowDataWranglerDriver(), args);
        System.exit(exitCode);
    }
}

作业提交命令

hadoop jar ./hadoop-examples-0.0.1-SNAPSHOT.jar com.aravind.learning.hadoop.mapred.techtalks.StackoverflowDataWranglerDriver data/stackoverflow-posts.xml data/so-posts-tsv
xcitsw88

xcitsw882#

经过大量的研究和实验,终于学会了在xml文件中为parsin编写map的方法,这些xml文件的语法和我提供的一样。我改变了我的方法,这是我的新Map程序代码。。。它为我的用例工作。
希望它能帮助一些人节省时间:)

import java.io.IOException;
import java.util.StringTokenizer;

import javax.xml.parsers.ParserConfigurationException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.xml.sax.SAXException;

public class Map extends Mapper<LongWritable, Text, NullWritable, Text> {
    NullWritable obj;

    @Override
    public void map(LongWritable key, Text value, Context context) throws InterruptedException {
        StringTokenizer tok= new StringTokenizer(value.toString()); 
        String pa=null,ow=null,pi=null,v;
        while (tok.hasMoreTokens()) {
            String[] arr;
            String val = (String) tok.nextToken();
            if(val.contains("PostTypeId")){
                arr= val.split("[\"]");
                pi=arr[arr.length-1];
                if(pi.equals("2")){
                    continue;
                }
                else break;
            }
            if(val.contains("ParentId")){
                arr= val.split("[\"]");
                pa=arr[arr.length-1];
            } 
            else if(val.contains("OwnerUserId") ){
                arr= val.split("[\"]");
                ow=arr[arr.length-1];
                try {
                    if(pa!=null && ow != null){
                        v=String.format("{0},{1}", ow,pa);
                        context.write(obj,new Text(v));

                    }
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }

    }

}

相关问题