本文整理了Java中org.apache.hadoop.mapred.Mapper
类的一些代码示例,展示了Mapper
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mapper
类的具体详情如下:
包路径:org.apache.hadoop.mapred.Mapper
类名称:Mapper
[英]Maps input key/value pairs to a set of intermediate key/value pairs.
Maps are the individual tasks which transform input records into a intermediate records. The transformed intermediate records need not be of the same type as the input records. A given input pair may map to zero or many output pairs.
The Hadoop Map-Reduce framework spawns one map task for each InputSplit generated by the InputFormat for the job. Mapper
implementations can access the JobConf for the job via the JobConfigurable#configure(JobConf) and initialize themselves. Similarly they can use the Closeable#close() method for de-initialization.
The framework then calls #map(Object,Object,OutputCollector,Reporter) for each key/value pair in the InputSplit
for that task.
All intermediate values associated with a given output key are subsequently grouped by the framework, and passed to a Reducer to determine the final output. Users can control the grouping by specifying a Comparator
via JobConf#setOutputKeyComparatorClass(Class).
The grouped Mapper
outputs are partitioned per Reducer
. Users can control which keys (and hence records) go to which Reducer
by implementing a custom Partitioner.
Users can optionally specify a combiner
, via JobConf#setCombinerClass(Class), to perform local aggregation of the intermediate outputs, which helps to cut down the amount of data transferred from the Mapper
to the Reducer
.
The intermediate, grouped outputs are always stored in SequenceFiles. Applications can specify if and how the intermediate outputs are to be compressed and which CompressionCodecs are to be used via the JobConf
.
If the job has zero reduces then the output of the Mapper
is directly written to the FileSystem without grouping by keys.
Example:
public class MyMapper<K extends WritableComparable, V extends Writable>
extends MapReduceBase implements Mapper<K, V, K, V> {
static enum MyCounters { NUM_RECORDS }
private String mapTaskId;
private String inputFile;
private int noRecords = 0;
public void configure(JobConf job) {
mapTaskId = job.get("mapred.task.id");
inputFile = job.get("mapred.input.file");
}
public void map(K key, V val,
OutputCollector<K, V> output, Reporter reporter)
throws IOException {
// Process the <key, value> pair (assume this takes a while)
// ...
// ...
// Let the framework know that we are alive, and kicking!
// reporter.progress();
// Process some more
// ...
// ...
// Increment the no. of <key, value> pairs processed
++noRecords;
// Increment counters
reporter.incrCounter(NUM_RECORDS, 1);
// Every 100 records update application-level status
if ((noRecords%100) == 0) {
reporter.setStatus(mapTaskId + " processed " + noRecords +
" from input-file: " + inputFile);
}
// Output the result
output.collect(key, val);
}
}
Applications may write a custom MapRunnable to exert greater control on map processing e.g. multi-threaded Mapper
s etc.
[中]
代码示例来源:origin: apache/flink
@Override
public void flatMap(final Tuple2<KEYIN, VALUEIN> value, final Collector<Tuple2<KEYOUT, VALUEOUT>> out)
throws Exception {
outputCollector.setFlinkCollector(out);
mapper.map(value.f0, value.f1, outputCollector, reporter);
}
代码示例来源:origin: apache/ignite
throw new HadoopTaskCancelledException("Map task cancelled.");
mapper.map(key, val, collector, reporter);
mapper.close();
代码示例来源:origin: apache/flink
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.mapper.configure(jobConf);
this.reporter = new HadoopDummyReporter();
this.outputCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>();
}
代码示例来源:origin: org.apache.hadoop/hadoop-mapred
public void close() throws IOException {
if (mapper != null) {
mapper.close();
}
}
代码示例来源:origin: apache/chukwa
public void testSetDefaultMapProcessor() throws IOException {
Mapper<ChukwaArchiveKey, ChunkImpl, ChukwaRecordKey, ChukwaRecord> mapper =
new Demux.MapClass();
JobConf conf = new JobConf();
conf.set("chukwa.demux.mapper.default.processor",
"org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MockMapProcessor,");
mapper.configure(conf);
ChunkBuilder cb = new ChunkBuilder();
cb.addRecord(SAMPLE_RECORD_DATA.getBytes());
ChunkImpl chunk = (ChunkImpl)cb.getChunk();
ChukwaTestOutputCollector<ChukwaRecordKey, ChukwaRecord> output =
new ChukwaTestOutputCollector<ChukwaRecordKey, ChukwaRecord>();
mapper.map(new ChukwaArchiveKey(), chunk, output, Reporter.NULL);
ChukwaRecordKey recordKey = new ChukwaRecordKey("someReduceType", SAMPLE_RECORD_DATA);
assertEquals("MockMapProcessor never invoked - no records found", 1, output.data.size());
assertNotNull("MockMapProcessor never invoked", output.data.get(recordKey));
}
代码示例来源:origin: com.github.jiayuhan-it/hadoop-mapreduce-client-core
public void close() throws IOException {
if (mapper != null) {
mapper.close();
}
}
代码示例来源:origin: apache/chukwa
public void testSetCustomeMapProcessor() throws IOException {
Mapper<ChukwaArchiveKey, ChunkImpl, ChukwaRecordKey, ChukwaRecord> mapper =
new Demux.MapClass();
String custom_DataType = "cus_dt";
JobConf conf = new JobConf();
conf.set(custom_DataType,
"org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MockMapProcessor,");
mapper.configure(conf);
ChunkBuilder cb = new ChunkBuilder();
cb.addRecord(SAMPLE_RECORD_DATA.getBytes());
ChunkImpl chunk = (ChunkImpl)cb.getChunk();
chunk.setDataType(custom_DataType);
ChukwaTestOutputCollector<ChukwaRecordKey, ChukwaRecord> output =
new ChukwaTestOutputCollector<ChukwaRecordKey, ChukwaRecord>();
mapper.map(new ChukwaArchiveKey(), chunk, output, Reporter.NULL);
ChukwaRecordKey recordKey = new ChukwaRecordKey("someReduceType", SAMPLE_RECORD_DATA);
assertEquals("MockMapProcessor never invoked - no records found", 1, output.data.size());
assertNotNull("MockMapProcessor never invoked", output.data.get(recordKey));
}
代码示例来源:origin: org.apache.flink/flink-hadoop-compatibility_2.11
@Override
public void flatMap(final Tuple2<KEYIN, VALUEIN> value, final Collector<Tuple2<KEYOUT, VALUEOUT>> out)
throws Exception {
outputCollector.setFlinkCollector(out);
mapper.map(value.f0, value.f1, outputCollector, reporter);
}
代码示例来源:origin: io.hops/hadoop-mapreduce-client-core
public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
Reporter reporter)
throws IOException {
try {
// allocate key & value instances that are re-used for all entries
K1 key = input.createKey();
V1 value = input.createValue();
while (input.next(key, value)) {
// map pair to output
mapper.map(key, value, output, reporter);
if(incrProcCount) {
reporter.incrCounter(SkipBadRecords.COUNTER_GROUP,
SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);
}
}
} finally {
mapper.close();
}
}
代码示例来源:origin: org.jvnet.hudson.hadoop/hadoop-core
public void close() throws IOException {
if (mapper != null) {
mapper.close();
}
}
代码示例来源:origin: org.apache.crunch/crunch-core
@Override
public void initialize() {
if (instance == null) {
this.instance = ReflectionUtils.newInstance(mapperClass, getConfiguration());
}
instance.configure(new JobConf(getConfiguration()));
outputCollector = new OutputCollectorImpl<K2, V2>();
}
代码示例来源:origin: org.apache.flink/flink-hadoop-compatibility
@Override
public void flatMap(final Tuple2<KEYIN, VALUEIN> value, final Collector<Tuple2<KEYOUT, VALUEOUT>> out)
throws Exception {
outputCollector.setFlinkCollector(out);
mapper.map(value.f0, value.f1, outputCollector, reporter);
}
代码示例来源:origin: org.apache.hadoop/hadoop-mapred
public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
Reporter reporter)
throws IOException {
try {
// allocate key & value instances that are re-used for all entries
K1 key = input.createKey();
V1 value = input.createValue();
while (input.next(key, value)) {
// map pair to output
mapper.map(key, value, output, reporter);
if(incrProcCount) {
reporter.incrCounter(SkipBadRecords.COUNTER_GROUP,
SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);
}
}
} finally {
mapper.close();
}
}
代码示例来源:origin: io.hops/hadoop-mapreduce-client-core
public void close() throws IOException {
if (mapper != null) {
mapper.close();
}
}
代码示例来源:origin: org.apache.flink/flink-hadoop-compatibility
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.mapper.configure(jobConf);
this.reporter = new HadoopDummyReporter();
this.outputCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>();
}
代码示例来源:origin: com.alibaba.blink/flink-hadoop-compatibility
@Override
public void flatMap(final Tuple2<KEYIN, VALUEIN> value, final Collector<Tuple2<KEYOUT, VALUEOUT>> out)
throws Exception {
outputCollector.setFlinkCollector(out);
mapper.map(value.f0, value.f1, outputCollector, reporter);
}
代码示例来源:origin: io.prestosql.hadoop/hadoop-apache
public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
Reporter reporter)
throws IOException {
try {
// allocate key & value instances that are re-used for all entries
K1 key = input.createKey();
V1 value = input.createValue();
while (input.next(key, value)) {
// map pair to output
mapper.map(key, value, output, reporter);
if(incrProcCount) {
reporter.incrCounter(SkipBadRecords.COUNTER_GROUP,
SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);
}
}
} finally {
mapper.close();
}
}
代码示例来源:origin: ch.cern.hadoop/hadoop-mapreduce-client-core
public void close() throws IOException {
if (mapper != null) {
mapper.close();
}
}
代码示例来源:origin: org.apache.flink/flink-hadoop-compatibility_2.11
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.mapper.configure(jobConf);
this.reporter = new HadoopDummyReporter();
this.outputCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>();
}
代码示例来源:origin: io.hops/hadoop-mapreduce-client-core
@SuppressWarnings("unchecked")
public void map(K1 key, V1 value, OutputCollector<K2, V2> outputCollector,
Reporter reporter) throws IOException {
if (mapper == null) {
// Find the Mapper from the TaggedInputSplit.
TaggedInputSplit inputSplit = (TaggedInputSplit) reporter.getInputSplit();
mapper = (Mapper<K1, V1, K2, V2>) ReflectionUtils.newInstance(inputSplit
.getMapperClass(), conf);
}
mapper.map(key, value, outputCollector, reporter);
}
内容来源于网络,如有侵权,请联系作者删除!