例如,我有两个文件,a和b。它们都是文本文件。
包含一行文本,如下所示:
I'm A
b包含三行文字,如下所示:
I'm B1
I'm B2
I'm B3
我的例外是在每行的前面加上a的唯一一行,所以结果是:
I'm A I'm B1
I'm A I'm B2
I'm A I'm B3
对我来说最困难的部分是如何理解mapreduce job的参数。在传统函数中,我可以让a和b作为两个参数,然后将它们合并到规则中。一些伪代码如下:
// Two parameters, A and B
public void merge(File A, File B) {
String lineA = A.firstLine;
for (Each lineB in B) {
String result = lineA + " " + lineB;
File.wirte(result, "ResultFile", appended);
}
}
但是在mapreduce阶段,对于exmaple没有“传统参数”:
public void map(LongWritable key, Text value, Context context) {
String line = value.toString();
try {
context.write(new Text(line), new Text(""));
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
我知道上面的代码会逐行读取文件,使用每一行作为键并将值设置为空,然后将键值对写入文件。
但同样,上面的Map器代码没有传统的参数,我不知道如何执行合并字符串的逻辑,并写出结果。
下面是我现在所想的伪代码,是我有正确的设计还是我走错了方向?我应该如何设计mapreduce作业?一些示例代码将非常有用!
// I think I should pass in two Files
// So I can merge the line of them
public void map(File A, File B, Context context) {
String key = A.firstLine();
for (each LineB in B) {
String value = key + " " + lineB;
}
try {
context.write(new Text(""), value);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
1条答案
按热度按时间y53ybaqx1#
这是一个交叉连接-将a中的每个记录连接到b中的每个记录。幸运的是,这是一个覆盖得很好的mapreduce设计模式。
解决方案取决于数据集的大小。如果a或b能进入记忆,你就很幸运了。只需将其中一个添加到distributedcache,并只将另一个读入Map器。你甚至不需要减速机!
如果两个数据集都很大,那么您的解决方案就必须包括编写一个自定义的输入拆分,以保证每组数据的输出次数足以与另一组数据中的所有块匹配。请参阅mapreduce中的交叉产品-本示例也包含在mapreduce设计模式一书中。