使用javamapreduce处理json

91zkwejq  于 2021-06-02  发布在  Hadoop
关注(0)|答案(2)|浏览(305)

我是hadoop mapreduce的新手
我输入的文本文件中的数据已存储如下。这里只有几个元组(data.txt)

{"author":"Sharīf Qāsim","book":"al- Rabīʻ al-manshūd"}
{"author":"Nāṣir Nimrī","book":"Adīb ʻAbbāsī"}
{"author":"Muẓaffar ʻAbd al-Majīd Kammūnah","book":"Asmāʼ Allāh al-ḥusná al-wāridah fī muḥkam kitābih"}
{"author":"Ḥasan Muṣṭafá Aḥmad","book":"al- Jabhah al-sharqīyah wa-maʻārikuhā fī ḥarb Ramaḍān"}
{"author":"Rafīqah Salīm Ḥammūd","book":"Taʻlīm fī al-Baḥrayn"}

这是我应该用来编写代码的java文件(combinebooks.java)

package org.hwone;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.GenericOptionsParser;

//TODO import necessary components

/*

* Modify this file to combine books from the same other into
* single JSON object.
* i.e. {"author": "Tobias Wells", "books": [{"book":"A die in the country"},{"book": "Dinky died"}]}
* Beaware that, this may work on anynumber of nodes!
* 
* /

public class CombineBooks {

  //TODO define variables and implement necessary components

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args)
                .getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: CombineBooks <in> <out>");
      System.exit(2);
    }

    //TODO implement CombineBooks

    Job job = new Job(conf, "CombineBooks");

    //TODO implement CombineBooks

    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

我的任务是在“question-2”目录中返回的“combinebooks.java”中创建一个hadoop程序。程序应该执行以下操作:给定输入的author book元组,map-reduce程序应该生成一个json对象,该对象在一个json数组中包含来自同一作者的所有书籍,即。

{"author": "Tobias Wells", "books":[{"book":"A die in the country"},{"book": "Dinky died"}]}

你知道怎么做吗?

wfveoks0

wfveoks01#

首先,您尝试使用的json对象对您不可用。要解决此问题:
转到此处并以zip格式下载:https://github.com/douglascrockford/json-java
提取到子目录org/json中的sources文件夹/*
接下来,代码的第一行生成一个包“org.json”,这是不正确的,您应该创建一个单独的包,例如“my.books”。
第三,在这里使用合路器是无用的。
以下是我最终得到的代码,它可以工作并解决您的问题:

package my.books;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.json.*;

import javax.security.auth.callback.TextInputCallback;

public class CombineBooks {

    public static class Map extends Mapper<LongWritable, Text, Text, Text>{

        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{

            String author;
            String book;
            String line = value.toString();
            String[] tuple = line.split("\\n");
            try{
                for(int i=0;i<tuple.length; i++){
                    JSONObject obj = new JSONObject(tuple[i]);
                    author = obj.getString("author");
                    book = obj.getString("book");
                    context.write(new Text(author), new Text(book));
                }
            }catch(JSONException e){
                e.printStackTrace();
            }
        }
    }

    public static class Reduce extends Reducer<Text,Text,NullWritable,Text>{

        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{

            try{
                JSONObject obj = new JSONObject();
                JSONArray ja = new JSONArray();
                for(Text val : values){
                    JSONObject jo = new JSONObject().put("book", val.toString());
                    ja.put(jo);
                }
                obj.put("books", ja);
                obj.put("author", key.toString());
                context.write(NullWritable.get(), new Text(obj.toString()));
            }catch(JSONException e){
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        if (args.length != 2) {
            System.err.println("Usage: CombineBooks <in> <out>");
            System.exit(2);
        }

        Job job = new Job(conf, "CombineBooks");
        job.setJarByClass(CombineBooks.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

以下是我的项目的文件夹结构:

src
src/my
src/my/books
src/my/books/CombineBooks.java
src/org
src/org/json
src/org/json/zip
src/org/json/zip/BitReader.java
...
src/org/json/zip/None.java
src/org/json/JSONStringer.java
src/org/json/JSONML.java
...
src/org/json/JSONException.java

这是输入

[localhost:CombineBooks]$ hdfs dfs -cat /example.txt
{"author":"author1", "book":"book1"}
{"author":"author1", "book":"book2"}
{"author":"author1", "book":"book3"}
{"author":"author2", "book":"book4"}
{"author":"author2", "book":"book5"}
{"author":"author3", "book":"book6"}

要运行的命令:

hadoop jar ./bookparse.jar my.books.CombineBooks /example.txt /test_output

输出如下:

[pivhdsne:CombineBooks]$ hdfs dfs -cat /test_output/part-r-00000
{"books":[{"book":"book3"},{"book":"book2"},{"book":"book1"}],"author":"author1"}
{"books":[{"book":"book5"},{"book":"book4"}],"author":"author2"}
{"books":[{"book":"book6"}],"author":"author3"}

您可以使用三个选项中的一个 org.json.* 群集中的类:
打包 org.json.* 类放入jar文件中(可以使用gui ide轻松完成)。这是我在回答中使用的选项
将包含 org.json.* 将每个集群节点上的类放入一个类路径目录中(参见yarn.application.classpath)
将包含 org.json.* 进入hdfs( hdfs dfs -put <org.json jar> <hdfs path> )使用 job.addFileToClassPath 调用此jar文件,使其可用于集群上执行作业的所有任务。在我的回答中你应该加上 job.addFileToClassPath(new Path("<jar_file_on_hdfs_location>"));main

zz2j4svz

zz2j4svz2#

请参阅可拆分多行json:https://github.com/alexholmes/json-mapreduce

相关问题