当数据来源不同的时候,比如用户表在MYSQL数据库中,而销售表在HDFS中,我们可以启动多个作业来依次处理这些数据源。
#需求
#用户表user在MYSQL数据库中,数据如下:
1 liaozhongmin
2 lavimer
3 liaozemin
#销售表user_data在HDFS中,数据如下:
1 12
2 28
2 36
3 88
#我们现在的需求是要统计每个用户的销售情况,结果应该如下显示:
1 liaozhongmin 12
2 lavimer 64
3 liaozemin 88
MultiJob1.java从数据库中读取数据并进行处理:
public class MultiJob1 {
public static class Step1Mapper extends Mapper<LongWritable, User, Text, Text>{
//创建输出的key
private Text outKey = new Text();
private Text outValue = new Text();
protected void map(LongWritable key, User value, Mapper<LongWritable, User, Text, Text>.Context context) throws IOException, InterruptedException {
//设置key
outKey.set(String.valueOf(value.getId()));
//设置写出去的value
outValue.set(value.getName());
//把结果写出去
context.write(outKey, outValue);
}
}
public static class Step1Reducer extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
for (Text val : values){
context.write(key, val);
}
}
}
/**
* 运行job的方法
* @param path
*/
public static void run(Map<String, String> path){
try {
//创建配置信息
Configuration conf = new Configuration();
//通过conf创建数据库配置信息
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://liaozhongmin:3306/myDB","root","134045");
//从map集合中取出输出路径
String step1OutPath = path.get("step1Output");
//创建文件系统
FileSystem fileSystem = FileSystem.get(new URI(step1OutPath), conf);
//如果输出目录存在就删除
if (fileSystem.exists(new Path(step1OutPath))){
fileSystem.delete(new Path(step1OutPath),true);
}
//创建任务
Job job = new Job(conf,MultiJob1.class.getName());
//1.1 设置输入数据格式化的类和设置数据来源
job.setInputFormatClass(DBInputFormat.class);
DBInputFormat.setInput(job, User.class, "user", null, null, new String[]{"id","name"});
//1.2 设置自定义的Mapper类和Mapper输出的key和value的类型
job.setMapperClass(Step1Mapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//1.3 设置分区和reduce数量(reduce的数量和分区的数量对应,因为分区只有一个,所以reduce的个数也设置为一个)
job.setPartitionerClass(HashPartitioner.class);
job.setNumReduceTasks(1);
//1.4 排序
//1.5 归约
//2.1 Shuffle把数据从Map端拷贝到Reduce端
//job.setCombinerClass(Step1Reducer.class);
//2.2 指定Reducer类和输出key和value的类型
job.setReducerClass(Step1Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//2.3 指定输出的路径和设置输出的格式化类
FileOutputFormat.setOutputPath(job, new Path(step1OutPath));
job.setOutputFormatClass(TextOutputFormat.class);
//提交作业 然后关闭虚拟机正常退出
job.waitForCompletion(true);
} catch (Exception e) {
e.printStackTrace();
}
}
}
MultiJob2.java从HDFS中读取数据并且和第一个Job处理后的结果进行合并:
public class MultiJob2 {
// 定义一个输入路径用于判断当前处理的是来自哪里的文件
private static String FILE_PATH = "";
public static class Step2Mapper extends Mapper<LongWritable, Text, Text, Text> {
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
// 获取文件的输入路径
FileSplit fileSplit = (FileSplit) context.getInputSplit();
FILE_PATH = fileSplit.getPath().toString();
// 获取输入行记录
String line = value.toString();
// 抛弃无效记录(这里最好使用计数器统计一下无效记录)
if (line == null || line.equals("")) {
return;
}
// 处理来自数据库中的中间结果
if (FILE_PATH.contains("part")) {
// 按制表符进行切割
String[] values = line.split("\t");
// 当数组长度小于2的时候,视为无效记录
if (values.length < 2) {
return;
}
// 获取id和name
String id = values[0];
String name = values[1];
// 把结果写出去
context.write(new Text(id), new Text(name));
} else if (FILE_PATH.contains("user_data")) {
// 按制表符进行切割
String[] values = line.split("\t");
// 当数组长度小于2的时候,视为无效记录
if (values.length < 2) {
return;
}
// 获取id和grade
String id = values[0];
String score = values[1];
// 把结果写出去
context.write(new Text(id), new Text(score));
}
}
}
public static class Step2Reducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
// 用来存放来自数据库的中间结果
Vector<String> vectorDB = new Vector<String>();
// 用来存放来自HDFS中处理后的结果
Vector<String> vectorHDFS = new Vector<String>();
// 迭代数据键对应的数据添加到相应Vector中
for (Text val : values) {
if (val.toString().startsWith("db#")) {
vectorDB.add(val.toString().substring(3));
} else if (val.toString().startsWith("hdfs#")) {
vectorHDFS.add(val.toString().substring(5));
}
}
// 获取两个Vector集合的长度
int sizeA = vectorDB.size();
int sizeB = vectorHDFS.size();
// 做笛卡尔积
for (int i = 0; i < sizeA; i++) {
for (int j = 0; j < sizeB; j++) {
context.write(new Text(key), new Text(vectorDB.get(i) + "\t" + vectorHDFS.get(j)));
}
}
}
}
/**
* 自定义Combiner
*
* @author 廖钟民 time : 2015年1月25日下午1:39:51
* @version
*/
public static class Step2Combiner extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
int sum = 0;
//处理来自数据库中的数据
if (FILE_PATH.contains("part")) {
for (Text val : values) {
context.write(key, new Text("db#" + val.toString()));
}
} else {//处理来自HDFS中的数据
for (Text val : values) {
sum += Integer.parseInt(val.toString());
}
context.write(key, new Text("hdfs#" + String.valueOf(sum)));
}
}
}
public static void run(Map<String, String> paths) {
try {
// 创建配置信息
Configuration conf = new Configuration();
// 从Map集合中获取输入输出路径
String step2Input1 = paths.get("step2Input1");
String step2Input2 = paths.get("step2Input2");
String step2Output = paths.get("step2Output");
// 创建文件系统
FileSystem fileSystem = FileSystem.get(new URI(step2Output), conf);
// 如果输出目录存在,我们就删除
if (fileSystem.exists(new Path(step2Output))) {
fileSystem.delete(new Path(step2Output), true);
}
// 创建任务
Job job = new Job(conf, MultiJob2.class.getName());
// 1.1 设置输入目录和设置输入数据格式化的类
FileInputFormat.addInputPath(job, new Path(step2Input1));
FileInputFormat.addInputPath(job, new Path(step2Input2));
job.setInputFormatClass(TextInputFormat.class);
//1.2 设置自定义Mapper类和设置map函数输出数据的key和value的类型
job.setMapperClass(Step2Mapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 1.3 设置分区和reduce数量(reduce的数量,和分区的数量对应,因为分区为一个,所以reduce的数量也是一个)
job.setPartitionerClass(HashPartitioner.class);
job.setNumReduceTasks(1);
// 1.4 排序
// 1.5 归约
job.setCombinerClass(Step2Combiner.class);
// 2.1 Shuffle把数据从Map端拷贝到Reduce端。
// 2.2 指定Reducer类和输出key和value的类型
job.setReducerClass(Step2Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 2.3 指定输出的路径和设置输出的格式化类
FileOutputFormat.setOutputPath(job, new Path(step2Output));
job.setOutputFormatClass(TextOutputFormat.class);
// 提交作业 退出
job.waitForCompletion(true);
} catch (Exception e) {
e.printStackTrace();
}
}
}
MultiJobTest.java作业调度控制类:
public class MultiJobTest {
//定义HDFS的路径
public static final String HDFS = "hdfs://liaozhongmin:9000";
public static void main(String[] args) {
//定义一个map集合用于存储操作参数
Map<String, String> paths = new HashMap<String, String>();
//存储第一步的输出路径(第一步是从数据库中去取数据,没有输入路径)
paths.put("step1Output", HDFS + "/step1_Out");
//存储第二部的输入路径(第二个参数是多参数输入的)
paths.put("step2Input1", HDFS + "/step2_inpath/user_data");
paths.put("step2Input2", HDFS + "/step1_Out/part-*");
paths.put("step2Output", HDFS + "/step2_out");
//依次运行job
MultiJob1.run(paths);
MultiJob2.run(paths);
System.exit(0);
}
public static JobConf config(){
//创建配置
JobConf conf = new JobConf(MultiJobTest.class);
conf.setJobName("MultiJobTest");
//设置配置文件
/*conf.addResource("classpath:/hadoop/core-site.xml");
conf.addResource("classpath:/hadoop/hdfs-site.xml");
conf.addResource("classpath:/hadoop/mapred-site.xml");*/
conf.set("io.sort.mb", "1024");
return conf;
}
}
程序运行结果如下:
原文转自:https://blog.csdn.net/lzm1340458776/article/details/43114611
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://marco.blog.csdn.net/article/details/81632282
内容来源于网络,如有侵权,请联系作者删除!