mapreduce—如何在hadoop的默认linerecordreader中设置值

mfpqipee  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(343)

一旦jobtracker使用inputformat类的getsplits()函数获取拆分。然后jobtracker根据拆分的存储位置分配maptasks,maptask调用inputformat类中的createrecordreader()方法,该方法反过来调用linerecordreader类。initialize函数获取开始位置和结束位置,nextkeyvalue()设置键、值。我的查询是这样的,key是按照下面的代码用pos设置的,但是这个值是怎么设置的呢。

public boolean  nextKeyValue() throws IOException {
     if (key == null) {
       key = new LongWritable();
     }
     key.set(pos);
     if (value == null) {
       value = new Text();
     }
     int newSize = 0;
     while (pos < end) {
       newSize = in.readLine(value, maxLineLength,
                             Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),maxLineLength))
      if (newSize == 0) {
        break;
      }
      pos += newSize;
      if (newSize < maxLineLength) {
        break;
      }
      // line too long. try again
      LOG.info("Skipped line of size " + newSize + " at pos " + 
               (pos - newSize));
    }
    if (newSize == 0) {
      key = null;
      value = null;
      return false;
    } else {
      return true;
    }
  }
f0ofjuux

f0ofjuux1#

在nextkeyvalue()中,计算时

newSize = in.readLine(value, maxLineLength,
                             Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),maxLineLength))

在这里,readline将把数据填充到value对象中。您可以在这里参考readline实现

if (appendLength > 0) {
        str.append(buffer, startPosn, appendLength);
       txtLength += appendLength;
     }

您可以在中参阅本文,以了解实际的传递值是如何工作的。

相关问题