我是hadoop新手,不知道如何将两个csv文件链接在一起。这是我的两个csv文件:
订单数据集.csv
order_id order_approved_at order_delivered_customer_date
---------- ------------------- -------------------------------
1 2017-10-02 19:55:00 2017-10-04 04:39:00
2 2017-01-26 14:16:31 2017-02-02 14:08:10
3 2018-06-09 03:13:12 2018-06-19 12:05:52
订单\审核\数据集.csv
order_id customer_id review_score
---------- ------------- --------------
1 12 3
2 23 4
3 93 5
我想要这样一个结果文件:
delivery_time in day avg_review_score
---------------------- ------------------
1 3.03
2 4.5
3 3.76
目前,我已经计算了交货时间。我不知道如何使用第二个csv文件来添加评论评分。这是我的密码:
public class Question {
public static void main(String[] args) throws Exception
{
if (args.length != 1) {
System.err.println("Usage: Question <input path>");
System.exit(-1);
}
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Job Title");
job.setJarByClass(Question.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputPath = new Path("./output/question3");
FileOutputFormat.setOutputPath(job, outputPath);
outputPath.getFileSystem(conf).delete(outputPath,true);
job.setMapperClass(QuestionMapper.class);
job.setReducerClass(QuestionReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
public class QuestionMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text jourDelaiLivraison = new Text();
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException
{
if (key.get() == 0)
return;
String ligne = value.toString();
String[] tokens = ligne.split(",");
DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
Date order_approved_at = formatter.parse(tokens[4]);
Date order_delivered_customer_date = formatter.parse(tokens[6]);
//Il faut que la commande soit delivered
//Et que les Dates soient cohérentes
if (tokens[2].equals("delivered") && order_approved_at.compareTo(order_delivered_customer_date) < 0) {
long diff = order_delivered_customer_date.getTime() - order_approved_at.getTime();
String delai = String.valueOf(TimeUnit.DAYS.convert(diff, TimeUnit.MILLISECONDS));
jourDelaiLivraison.set(delai);
}
context.write(jourDelaiLivraison, new IntWritable(1));
} catch (ParseException e) {
e.printStackTrace();
}
}
}
public class QuestionReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
private Map<Text, Integer> delaiNoteSatisfaction;
@Override
public void setup(Context context) {
this.delaiNoteSatisfaction = new HashMap<>();
}
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
{
int count = StreamSupport.stream(values.spliterator(), false)
.mapToInt(IntWritable::get)
.sum();
delaiNoteSatisfaction.put(new Text(key), count);
}
@Override
public void cleanup(Context context){
List<Text> keyList = new ArrayList(delaiNoteSatisfaction.keySet());
keyList.sort(Comparator.comparingInt((Text t) -> Integer.valueOf(t.toString())));
keyList.forEach(key -> {
try {
context.write(key, new IntWritable(delaiNoteSatisfaction.get(key)));
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
});
}
}
暂无答案!
目前还没有任何答案,快来回答吧!