如何用java从aws读取公共爬网的所有数据?

xytpbqjk  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(321)

我对hadoop和mapreduce编程完全陌生,我正试图用普通爬网的数据编写我的第一个mapreduce程序。
我想阅读aws 2015年4月的所有数据。例如,如果我想在命令行中下载2015年4月的所有数据,我会:
s3cmd get s3://aws publicdatasets/common crawl/crawl data/cc-main-2015-18/segments/1429246633512.41/wat/*.warc.wat.gz
这个命令行可以工作,但我不想下载2015年4月的所有数据,我只想读取所有“warc.wat.gz”文件(以便分析数据)。
我试着创造我的工作,就像这样:

public class FirstJob extends Configured implements Tool {
    private static final Logger LOG = Logger.getLogger(FirstJob.class);

    /**
     * Main entry point that uses the {@link ToolRunner} class to run the Hadoop
     * job.
     */
    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new FirstJob(), args);
        System.out.println("done !!");
        System.exit(res);
    }

    /**
     * Builds and runs the Hadoop job.
     * 
     * @return 0 if the Hadoop job completes successfully and 1 otherwise.
     */
    public int run(String[] arg0) throws Exception {
        Configuration conf = getConf();
        //
        Job job = new Job(conf);
        job.setJarByClass(FirstJob.class);
        job.setNumReduceTasks(1);

        //String inputPath = "data/*.warc.wat.gz";
        String inputPath = "s3n://aws-publicdatasets/common-crawl/crawl-data/CC-MAIN-2015-18/segments/1429246633512.41/wat/*.warc.wat.gz";
        LOG.info("Input path: " + inputPath);
        FileInputFormat.addInputPath(job, new Path(inputPath));

        String outputPath = "/tmp/cc-firstjob/";
        FileSystem fs = FileSystem.newInstance(conf);
        if (fs.exists(new Path(outputPath))) {
            fs.delete(new Path(outputPath), true);
        }
        FileOutputFormat.setOutputPath(job, new Path(outputPath));

        job.setInputFormatClass(WARCFileInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        job.setMapperClass(FirstJobUrlTypeMap.ServerMapper.class);
        job.setReducerClass(LongSumReducer.class);

        if (job.waitForCompletion(true)) {
            return 0;
        } else {
            return 1;
        }
    }

但我有个错误:
线程“main”java.lang.illegalargumentexception中的异常:必须将aws访问密钥id和机密访问密钥分别指定为s3n url的用户名或密码,或者通过设置fs.s3n.awsaccesskeyid或fs.s3n.awssecretaccesskey属性(分别)来指定。
我怎样才能解决我的问题?提前谢谢,

ca1c2owp

ca1c2owp1#

我解决了我的问题。在代码中,更改:

Configuration conf = getConf();
 //
 Job job = new Job(conf);

Configuration conf = new Configuration();
conf.set("fs.s3n.awsAccessKeyId", "your_key");
conf.set("fs.s3n.awsSecretAccessKey", "your_key");
Job job = new Job(conf);

相关问题