hadoop:reducer的输入记录数

mzaanser  于 2021-06-03  发布在  Hadoop
关注(0)|答案(2)|浏览(345)

每个reducer进程是否都可以确定它必须处理的元素或记录的数量?

cl25kdpy

cl25kdpy1#

reducer类必须扩展mapreducer reduce类: Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 然后必须使用扩展reduce类中指定的keyin/valuein参数实现reduce方法 reduce(KEYIN key, Iterable<VALUEIN> values, org.apache.hadoop.mapreduce.Reducer.Context context) 与给定键关联的值可以通过

int count = 0;
Iterator<VALUEIN> it = values.iterator();
while(it.hasNext()){
  it.Next();
  count++;
}

尽管我建议在其他处理的同时进行计数,以免两次通过值集。
编辑
下面是一个向量的示例,它将随着添加而动态增长(因此不必静态声明数组,因此不需要值集的大小)。这对非常规数据最有效(即输入csv文件中的每一行的列数不同),但开销最大。

Vector table = new Vector();

Iterator<Text> it = values.iterator();
while(it.hasNext()){

  Text t = it.Next();
  String[] cols = t.toString().split(",");   

  int i = 0;
  Vector row = new Vector(); //new vector will be our row
  while(StringUtils.isNotEmpty(cols[i])){
    row.addElement(cols[i++]); //here were adding a new column for every value in the csv row
  }

  table.addElement(row);
}

然后可以通过访问第n行的第m列

table.get(N).get(M);

现在,如果您知道将设置列的#,您可以修改它以使用数组向量,这可能会更快/更节省空间。

gfttwv5a

gfttwv5a2#

简短回答-提前回答不,reducer不知道iterable支持多少值。唯一可以做到这一点的方法是在迭代时进行计数,但不能再次对iterable进行迭代。
长答案-支持iterable实际上是序列化键/值对的排序字节数组。reducer有两个比较器—一个按键顺序对键/值对进行排序,另一个确定键之间的边界(称为键分组器)。通常,密钥grouper与密钥排序比较器相同。
迭代特定键的值时,基础上下文检查数组中的下一个键,并使用分组比较器将其与上一个键进行比较。如果比较器确定它们相等,则迭代继续。否则,这个特定键的迭代就结束了。因此您可以看到,您无法提前确定将如何为任何特定键传递值。
如果您创建一个复合键,比如一个text/intwritable对,您实际上可以看到这一点。对于compareto方法,首先按文本排序,然后按intwritable字段排序。接下来创建一个比较器用作组比较器,它只考虑键的文本部分。现在,当您迭代reducer中的值时,您应该能够观察到键的可写部分随着每次迭代而变化。
我以前用来演示这个场景的一些代码可以在这个pastebin上找到

相关问题