在hadoop中使用两个csv文件通过id列链接

fdx2calv  于 2021-06-01  发布在  Hadoop
关注(0)|答案(0)|浏览(133)

我是hadoop新手,不知道如何将两个csv文件链接在一起。这是我的两个csv文件:
订单数据集.csv

order_id   order_approved_at    order_delivered_customer_date
---------- -------------------  -------------------------------
     1     2017-10-02 19:55:00       2017-10-04 04:39:00
     2     2017-01-26 14:16:31       2017-02-02 14:08:10
     3     2018-06-09 03:13:12       2018-06-19 12:05:52

订单\审核\数据集.csv

order_id   customer_id     review_score
---------- -------------  --------------
     1          12              3
     2          23              4
     3          93              5

我想要这样一个结果文件:

delivery_time in day   avg_review_score 
---------------------- ------------------ 
          1                    3.03
          2                    4.5
          3                    3.76

目前,我已经计算了交货时间。我不知道如何使用第二个csv文件来添加评论评分。这是我的密码:

public class Question {

public static void main(String[] args) throws Exception
{
    if (args.length != 1) {
        System.err.println("Usage: Question <input path>");
        System.exit(-1);
    }

    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "Job Title");
    job.setJarByClass(Question.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    Path outputPath = new Path("./output/question3");
    FileOutputFormat.setOutputPath(job, outputPath);
    outputPath.getFileSystem(conf).delete(outputPath,true);

    job.setMapperClass(QuestionMapper.class);
    job.setReducerClass(QuestionReducer.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

public class QuestionMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

private Text jourDelaiLivraison = new Text();

@Override
public void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException
{
    if (key.get() == 0)
        return;

    String ligne = value.toString();
    String[] tokens = ligne.split(",");

    DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    try {
        Date order_approved_at = formatter.parse(tokens[4]);
        Date order_delivered_customer_date = formatter.parse(tokens[6]);

        //Il faut que la commande soit delivered
        //Et que les Dates soient cohérentes
        if (tokens[2].equals("delivered") && order_approved_at.compareTo(order_delivered_customer_date) < 0) {
            long diff = order_delivered_customer_date.getTime() - order_approved_at.getTime();
            String delai = String.valueOf(TimeUnit.DAYS.convert(diff, TimeUnit.MILLISECONDS));
            jourDelaiLivraison.set(delai);
        }

        context.write(jourDelaiLivraison, new IntWritable(1));
    } catch (ParseException e) {
        e.printStackTrace();
    }
}
}

public class QuestionReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
private Map<Text, Integer> delaiNoteSatisfaction;

@Override
public void setup(Context context) {
    this.delaiNoteSatisfaction = new HashMap<>();
}

@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
{
    int count = StreamSupport.stream(values.spliterator(), false)
            .mapToInt(IntWritable::get)
            .sum();

    delaiNoteSatisfaction.put(new Text(key), count);
}

@Override
public void cleanup(Context context){
    List<Text> keyList = new ArrayList(delaiNoteSatisfaction.keySet());
    keyList.sort(Comparator.comparingInt((Text t) -> Integer.valueOf(t.toString())));
    keyList.forEach(key -> {
        try {
            context.write(key, new IntWritable(delaiNoteSatisfaction.get(key)));
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }
    });
}
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题