java—使用elasticsearch hadoop map reduce将json从hdfs写入elasticsearch

icnyk63a  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(431)

我们将一些json数据存储到hdfs中,并尝试使用elasticsearch hadoop map reduce将数据摄取到elasticsearch中。
我们使用的代码非常简单(如下)

public class TestOneFileJob extends Configured implements Tool {

    public static class Tokenizer extends MapReduceBase
            implements Mapper<LongWritable, Text, LongWritable, Text> {

        @Override
        public void map(LongWritable arg0, Text value, OutputCollector<LongWritable, Text> output,
                Reporter reporter) throws IOException {

            output.collect(arg0, value);
        }

    }

    @Override
    public int run(String[] args) throws Exception {

        JobConf job = new JobConf(getConf(), TestOneFileJob.class);

        job.setJobName("demo.mapreduce");
        job.setInputFormat(TextInputFormat.class);
        job.setOutputFormat(EsOutputFormat.class);
        job.setMapperClass(Tokenizer.class);
        job.setSpeculativeExecution(false);

        FileInputFormat.setInputPaths(job, new Path(args[1]));

        job.set("es.resource.write", "{index_name}/live_tweets");

        job.set("es.nodes", "els-test.css.org");

        job.set("es.input.json", "yes");
        job.setMapOutputValueClass(Text.class);

        JobClient.runJob(job);

        return 0;
    }

    public static void main(String[] args) throws Exception {
        System.exit(ToolRunner.run(new TestOneFileJob(), args));
    }
}

这个代码工作得很好,但我们有两个问题。
第一个问题是价值 es.resource.write 财产。目前由物业提供 index_name 来自json。
如果json包含array类型的属性

{
"tags" : [{"tag" : "tag1"}, {"tag" : "tag2"}]
}

我们如何配置 es.resource.write 抢先一步 tag 例如,值?
我们试着用 {tags.tag} 以及 {tags[0].tag} 但两者都不起作用。
另一个问题是,如何使作业索引为tags属性的两个值中的json文档?

vyu0f0g1

vyu0f0g11#

我们通过以下步骤解决了这两个问题
1-在run方法中,我们将 es.resource.write 如下所示

job.set("es.resource.write", "{tag}/live_tweets");

2-在map函数中,我们使用gson库将json转换成一个对象

Object currentValue = gson.fromJson(jsonString, Object.class);

这里的对象是 POJO 我们的json
3-我们可以从对象中提取所需的标记,并将其值作为新属性添加到json中。
前面的步骤解决了第一个问题。关于第二个问题(如果我们希望根据标记的数量将同一个json存储到多个索引中),我们只需遍历json中的标记并更改添加的标记属性,然后将json再次传递给收集器。以下是此步骤所需的代码。

@Override
        public void map(LongWritable arg0, Text value, OutputCollector<LongWritable, Text> output, Reporter reporter)
                throws IOException {

            List<String> tags = getTags(value.toString());

            for (String tag : tags) {

                String newJson = value.toString().replaceFirst("\\{", "{\"tag\":\""+tag+"\",");

                output.collect(arg0, new Text(newJson));
            }
        }

相关问题