本文整理了Java中org.apache.hadoop.mapreduce.lib.output.MultipleOutputs
类的一些代码示例,展示了MultipleOutputs
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。MultipleOutputs
类的具体详情如下:
包路径:org.apache.hadoop.mapreduce.lib.output.MultipleOutputs
类名称:MultipleOutputs
[英]The MultipleOutputs class simplifies writing output data to multiple outputs
Case one: writing to additional outputs other than the job default output. Each additional output, or named output, may be configured with its own OutputFormat
, with its own key class and with its own value class.
Case two: to write data to different files provided by user
MultipleOutputs supports counters, by default they are disabled. The counters group is the MultipleOutputs class name. The names of the counters are the same as the output name. These count the number records written to each output name.
Usage pattern for job submission:
Job job = new Job();
FileInputFormat.setInputPath(job, inDir);
FileOutputFormat.setOutputPath(job, outDir);
job.setMapperClass(MOMap.class);
job.setReducerClass(MOReduce.class);
...
// Defines additional single text based output 'text' for the job
MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class,
LongWritable.class, Text.class);
// Defines additional sequence-file based output 'sequence' for the job
MultipleOutputs.addNamedOutput(job, "seq",
SequenceFileOutputFormat.class,
LongWritable.class, Text.class);
...
job.waitForCompletion(true);
...
Usage in Reducer:
<K, V> String generateFileName(K k, V v) {
return k.toString() + "_" + v.toString();
}
public class MOReduce extends
Reducer<WritableComparable, Writable,WritableComparable, Writable> {
private MultipleOutputs mos;
public void setup(Context context) {
...
mos = new MultipleOutputs(context);
}
public void reduce(WritableComparable key, Iterator<Writable> values,
Context context)
throws IOException {
...
mos.write("text", , key, new Text("Hello"));
mos.write("seq", LongWritable(1), new Text("Bye"), "seq_a");
mos.write("seq", LongWritable(2), key, new Text("Chau"), "seq_b");
mos.write(key, new Text("value"), generateFileName(key, new Text("value")));
...
}
public void cleanup(Context) throws IOException {
mos.close();
...
}
}
When used in conjuction with org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat, MultipleOutputs can mimic the behaviour of MultipleTextOutputFormat and MultipleSequenceFileOutputFormat from the old Hadoop API - ie, output can be written from the Reducer to more than one location.
Use MultipleOutputs.write(KEYOUT key, VALUEOUT value, String baseOutputPath)
to write key and value to a path specified by baseOutputPath
, with no need to specify a named output:
private MultipleOutputs<Text, Text> out;
public void setup(Context context) {
out = new MultipleOutputs<Text, Text>(context);
...
}
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text t : values) {
out.write(key, t, generateFileName(<parameter list...>));
}
}
protected void cleanup(Context context) throws IOException, InterruptedException {
out.close();
}
Use your own code in generateFileName()
to create a custom path to your results. '/' characters in baseOutputPath
will be translated into directory levels in your file system. Also, append your custom-generated path with "part" or similar, otherwise your output will be -00000, -00001 etc. No call to context.write()
is necessary. See example generateFileName()
code below.
private String generateFileName(Text k) {
// expect Text k in format "Surname|Forename"
String[] kStr = k.toString().split("\\|");
String sName = kStr[0];
String fName = kStr[1];
// example for k = Smith|John
// output written to /user/hadoop/path/to/output/Smith/John-r-00000 (etc)
return sName + "/" + fName;
}
Using MultipleOutputs in this way will still create zero-sized default output, eg part-00000. To prevent this use LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
instead of job.setOutputFormatClass(TextOutputFormat.class);
in your Hadoop job configuration.
[中]MultipleOutputs类简化了将输出数据写入多个输出的过程
案例一:写入作业默认输出以外的其他输出。每个附加输出或命名输出都可以配置自己的OutputFormat
,以及自己的键类和值类。
案例二:将数据写入用户提供的不同文件
MultipleOutput支持计数器,默认情况下它们是禁用的。counters组是MultipleOutputs类名。计数器的名称与输出名称相同。它们统计写入每个输出名称的记录数。
作业提交的使用模式:
Job job = new Job();
FileInputFormat.setInputPath(job, inDir);
FileOutputFormat.setOutputPath(job, outDir);
job.setMapperClass(MOMap.class);
job.setReducerClass(MOReduce.class);
...
// Defines additional single text based output 'text' for the job
MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class,
LongWritable.class, Text.class);
// Defines additional sequence-file based output 'sequence' for the job
MultipleOutputs.addNamedOutput(job, "seq",
SequenceFileOutputFormat.class,
LongWritable.class, Text.class);
...
job.waitForCompletion(true);
...
在减速器中的使用:
<K, V> String generateFileName(K k, V v) {
return k.toString() + "_" + v.toString();
}
public class MOReduce extends
Reducer<WritableComparable, Writable,WritableComparable, Writable> {
private MultipleOutputs mos;
public void setup(Context context) {
...
mos = new MultipleOutputs(context);
}
public void reduce(WritableComparable key, Iterator<Writable> values,
Context context)
throws IOException {
...
mos.write("text", , key, new Text("Hello"));
mos.write("seq", LongWritable(1), new Text("Bye"), "seq_a");
mos.write("seq", LongWritable(2), key, new Text("Chau"), "seq_b");
mos.write(key, new Text("value"), generateFileName(key, new Text("value")));
...
}
public void cleanup(Context) throws IOException {
mos.close();
...
}
}
与org连用时。阿帕奇。hadoop。mapreduce。lib。输出LazyOutputFormat,multipleOutput可以模仿旧Hadoop API中multipleExtOutputFormat和MultipleSequenceFileOutputFormat的行为,也就是说,输出可以从减速器写入多个位置。
使用MultipleOutputs.write(KEYOUT key, VALUEOUT value, String baseOutputPath)
将键和值写入baseOutputPath
指定的路径,无需指定命名输出:
private MultipleOutputs<Text, Text> out;
public void setup(Context context) {
out = new MultipleOutputs<Text, Text>(context);
...
}
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text t : values) {
out.write(key, t, generateFileName(<parameter list...>));
}
}
protected void cleanup(Context context) throws IOException, InterruptedException {
out.close();
}
在generateFileName()
中使用您自己的代码创建指向结果的自定义路径。'/'baseOutputPath
中的字符将被转换为文件系统中的目录级别。另外,在自定义生成的路径后面附加“part”或类似内容,否则输出将为-00000、-00001等。无需调用context.write()
。参见下面的示例generateFileName()
代码。
private String generateFileName(Text k) {
// expect Text k in format "Surname|Forename"
String[] kStr = k.toString().split("\\|");
String sName = kStr[0];
String fName = kStr[1];
// example for k = Smith|John
// output written to /user/hadoop/path/to/output/Smith/John-r-00000 (etc)
return sName + "/" + fName;
}
以这种方式使用multipleoutput仍将创建零大小的默认输出,例如part-00000。要防止这种情况,请在Hadoop作业配置中使用LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
而不是job.setOutputFormatClass(TextOutputFormat.class);
。
代码示例来源:origin: apache/kylin
private void outputDimRangeInfo() throws IOException, InterruptedException {
if (col != null && minValue != null) {
// output written to baseDir/colName/colName.dci-r-00000 (etc)
String dimRangeFileName = col.getIdentity() + "/" + col.getName() + DIMENSION_COL_INFO_FILE_POSTFIX;
mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(),
new Text(minValue.getBytes(StandardCharsets.UTF_8)), dimRangeFileName);
mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(),
new Text(maxValue.getBytes(StandardCharsets.UTF_8)), dimRangeFileName);
logger.info("write dimension range info for col : " + col.getName() + " minValue:" + minValue
+ " maxValue:" + maxValue);
}
}
代码示例来源:origin: apache/kylin
@Override
protected void doSetup(Context context) throws IOException {
super.bindCurrentConfiguration(context.getConfiguration());
mos = new MultipleOutputs(context);
String cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME);
String segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
CubeManager cubeManager = CubeManager.getInstance(config);
CubeInstance cube = cubeManager.getCube(cubeName);
CubeSegment optSegment = cube.getSegmentById(segmentID);
CubeSegment originalSegment = cube.getOriginalSegmentToOptimize(optSegment);
rowKeySplitter = new RowKeySplitter(originalSegment);
baseCuboid = cube.getCuboidScheduler().getBaseCuboidId();
recommendCuboids = cube.getCuboidsRecommend();
Preconditions.checkNotNull(recommendCuboids, "The recommend cuboid map could not be null");
}
代码示例来源:origin: apache/kylin
private void setupReducer(Path output, int numberOfReducers) throws IOException {
job.setReducerClass(UHCDictionaryReducer.class);
job.setPartitionerClass(UHCDictionaryPartitioner.class);
job.setNumReduceTasks(numberOfReducers);
MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class, NullWritable.class, ArrayPrimitiveWritable.class);
FileOutputFormat.setOutputPath(job, output);
job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());
//prevent to create zero-sized default output
LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
deletePath(job.getConfiguration(), output);
}
代码示例来源:origin: apache/kylin
private void outputDict(TblColRef col, Dictionary<String> dict) throws IOException, InterruptedException {
// output written to baseDir/colName/colName.rldict-r-00000 (etc)
String dictFileName = col.getIdentity() + "/" + col.getName() + DICT_FILE_POSTFIX;
try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream outputStream = new DataOutputStream(baos);) {
outputStream.writeUTF(dict.getClass().getName());
dict.write(outputStream);
mos.write(BatchConstants.CFG_OUTPUT_DICT, NullWritable.get(), new ArrayPrimitiveWritable(baos.toByteArray()), dictFileName);
}
mos.close();
}
}
代码示例来源:origin: ch.cern.hadoop/hadoop-mapreduce-client-core
/**
* Adds a named output for the job.
*
* @param job job to add the named output
* @param namedOutput named output name, it has to be a word, letters
* and numbers only, cannot be the word 'part' as
* that is reserved for the default output.
* @param outputFormatClass OutputFormat class.
* @param keyClass key class
* @param valueClass value class
*/
@SuppressWarnings("unchecked")
public static void addNamedOutput(Job job, String namedOutput,
Class<? extends OutputFormat> outputFormatClass,
Class<?> keyClass, Class<?> valueClass) {
checkNamedOutputName(job, namedOutput, true);
Configuration conf = job.getConfiguration();
conf.set(MULTIPLE_OUTPUTS,
conf.get(MULTIPLE_OUTPUTS, "") + " " + namedOutput);
conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass,
OutputFormat.class);
conf.setClass(MO_PREFIX + namedOutput + KEY, keyClass, Object.class);
conf.setClass(MO_PREFIX + namedOutput + VALUE, valueClass, Object.class);
}
代码示例来源:origin: geftimov/hadoop-map-reduce-patterns
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
Job countingJob = new Job(conf, "JobChaining-Counting");
countingJob.setJarByClass(BasicJobChaining.class);
countingJob.setMapperClass(UserIdCountMapper.class);
countingJob.setCombinerClass(LongSumReducer.class);
countingJob.setReducerClass(UserIdSumReducer.class);
Job binningJob = new Job(new Configuration(), "JobChaining-Binning");
binningJob.setJarByClass(BasicJobChaining.class);
MultipleOutputs.addNamedOutput(binningJob, MULTIPLE_OUTPUTS_BELOW_NAME,
TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(binningJob, MULTIPLE_OUTPUTS_ABOVE_NAME,
TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.setCountersEnabled(binningJob, true);
代码示例来源:origin: geftimov/hadoop-map-reduce-patterns
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
GenericOptionsParser parser = new GenericOptionsParser(conf, args);
String[] otherArgs = parser.getRemainingArgs();
System.exit(2);
Job job = new Job(conf, "Binning Tags");
job.setJarByClass(BinningTags.class);
MultipleOutputs.addNamedOutput(job, "bins", TextOutputFormat.class,
Text.class, NullWritable.class);
MultipleOutputs.setCountersEnabled(job, true);
job.setNumReduceTasks(0);
job.setMapperClass(BinningMapper.class);
job.setMapOutputKeyClass(Text.class);
代码示例来源:origin: thinkaurelius/faunus
String hadoopFileJar = graph.getConf().get("mapred.jar", null);
if (null == hadoopFileJar) {
if (new File("target/" + Tokens.FAUNUS_JOB_JAR).exists()) {
job.getConfiguration().setBoolean(PATH_ENABLED, this.pathEnabled);
job.getConfiguration().set("mapred.jar", hadoopFileJar);
job.setInputFormatClass(this.graph.getGraphInputFormat());
if (FileInputFormat.class.isAssignableFrom(this.graph.getGraphInputFormat())) {
FileInputFormat.setInputPaths(job, this.graph.getInputLocation());
MultipleOutputs.addNamedOutput(job, Tokens.SIDEEFFECT, this.graph.getSideEffectOutputFormat(), job.getOutputKeyClass(), job.getOutputKeyClass());
MultipleOutputs.addNamedOutput(job, Tokens.GRAPH, this.graph.getGraphOutputFormat(), NullWritable.class, FaunusVertex.class);
} else {
LazyOutputFormat.setOutputFormatClass(job, INTERMEDIATE_OUTPUT_FORMAT);
MultipleOutputs.addNamedOutput(job, Tokens.SIDEEFFECT, this.graph.getSideEffectOutputFormat(), job.getOutputKeyClass(), job.getOutputKeyClass());
MultipleOutputs.addNamedOutput(job, Tokens.GRAPH, INTERMEDIATE_OUTPUT_FORMAT, NullWritable.class, FaunusVertex.class);
代码示例来源:origin: ch.cern.hadoop/hadoop-mapreduce-client-jobclient
conf.set("io.serializations",
"org.apache.hadoop.io.serializer.JavaSerialization," +
"org.apache.hadoop.io.serializer.WritableSerialization");
job.setJobName("mo");
MultipleOutputs.addNamedOutput(job, TEXT, TextOutputFormat.class,
Long.class, String.class);
MultipleOutputs.setCountersEnabled(job, withCounters);
job.setSortComparatorClass(JavaSerializationComparator.class);
job.setMapOutputKeyClass(Long.class);
job.setMapOutputValueClass(String.class);
代码示例来源:origin: ShifuML/shifu
private Job createSEMapReduceJob(SourceType source, Configuration conf, String varSelectMSEOutputPath)
throws IOException {
@SuppressWarnings("deprecation")
Job job = new Job(conf, "Shifu: Variable Selection Wrapper Job : " + this.modelConfig.getModelSetName());
job.setJarByClass(getClass());
boolean isSEVarSelMulti = Boolean.TRUE.toString().equalsIgnoreCase(
Environment.getProperty(Constants.SHIFU_VARSEL_SE_MULTI, Constants.SHIFU_DEFAULT_VARSEL_SE_MULTI));
if(isSEVarSelMulti) {
job.setMapperClass(MultithreadedMapper.class);
MultithreadedMapper.setMapperClass(job, VarSelectMapper.class);
int threads;
threads = Constants.SHIFU_DEFAULT_VARSEL_SE_MULTI_THREAD;
conf.setInt("mapreduce.map.cpu.vcores", threads);
MultithreadedMapper.setNumberOfThreads(job, threads);
} else {
MultipleOutputs.addNamedOutput(job, Constants.SHIFU_VARSELECT_SE_OUTPUT_NAME, TextOutputFormat.class,
Text.class, Text.class);
return job;
代码示例来源:origin: pl.edu.icm.coansys/coansys-io-input
@Override
public int run(String[] args) throws Exception {
if ("DEBUG".equals(conf.get("job.logging"))) {
logger.setLevel(Level.DEBUG);
logger.debug("** Log Level set to DEBUG **");
Job job = new Job(conf, HBaseToDocumentProtoSequenceFile.class.getSimpleName() + "_" + tableName + "_" + outputDir);
job.setJarByClass(HBaseToDocumentProtoSequenceFile.class);
job.setNumReduceTasks(0);
job.setOutputKeyClass(BytesWritable.class);
SequenceFileOutputFormat.setOutputPath(job, new Path(outputDir));
MultipleOutputs.addNamedOutput(job, FAMILY_METADATA_DOCUMENT_QUALIFIER_PROTO, SequenceFileOutputFormat.class, BytesWritable.class, BytesWritable.class);
MultipleOutputs.addNamedOutput(job, FAMILY_CONTENT_QUALIFIER_PROTO, SequenceFileOutputFormat.class, BytesWritable.class, BytesWritable.class);
MultipleOutputs.addNamedOutput(job, "dproto", SequenceFileOutputFormat.class, BytesWritable.class, BytesWritable.class);
boolean success = job.waitForCompletion(true);
代码示例来源:origin: apache/kylin
@Override
public void doCleanup(Context context) throws IOException, InterruptedException {
mos.close();
Path outputDirBase = new Path(context.getConfiguration().get(FileOutputFormat.OUTDIR), PathNameCuboidBase);
FileSystem fs = FileSystem.get(context.getConfiguration());
if (!fs.exists(outputDirBase)) {
fs.mkdirs(outputDirBase);
SequenceFile
.createWriter(context.getConfiguration(),
SequenceFile.Writer.file(new Path(outputDirBase, "part-m-00000")),
SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class))
.close();
}
}
代码示例来源:origin: ShifuML/shifu
/**
* Do initialization like ModelConfig and ColumnConfig loading.
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
loadConfigFiles(context);
int[] inputOutputIndex = DTrainUtils.getInputOutputCandidateCounts(modelConfig.getNormalizeType(), this.columnConfigList);
this.inputNodeCount = inputOutputIndex[0] == 0 ? inputOutputIndex[2] : inputOutputIndex[0];
this.filterOutRatio = context.getConfiguration().getFloat(Constants.SHIFU_VARSELECT_FILTEROUT_RATIO,
Constants.SHIFU_DEFAULT_VARSELECT_FILTEROUT_RATIO);
this.filterNum = context.getConfiguration().getInt(Constants.SHIFU_VARSELECT_FILTER_NUM,
Constants.SHIFU_DEFAULT_VARSELECT_FILTER_NUM);
this.outputKey = new Text();
this.outputValue = new Text();
this.filterBy = context.getConfiguration()
.get(Constants.SHIFU_VARSELECT_FILTEROUT_TYPE, Constants.FILTER_BY_SE);
this.mos = new MultipleOutputs<Text, Text>(context);
LOG.info("FilterBy is {}, filterOutRatio is {}, filterNum is {}", filterBy, filterOutRatio, filterNum);
}
代码示例来源:origin: org.apache.hadoop/hadoop-mapred-test
Job job = MapReduceTestUtil.createJob(conf, IN_DIR, OUT_DIR, 2, 1, input);
job.setJobName("mo");
MultipleOutputs.addNamedOutput(job, TEXT, TextOutputFormat.class,
LongWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, SEQUENCE,
SequenceFileOutputFormat.class, IntWritable.class, Text.class);
MultipleOutputs.setCountersEnabled(job, withCounters);
job.setMapperClass(MOMap.class);
job.setReducerClass(MOReduce.class);
job.waitForCompletion(true);
代码示例来源:origin: openimaj/openimaj
@Override
public int run(String[] args) throws Exception {
final CmdLineParser parser = new CmdLineParser(this);
try {
parser.parseArgument(args);
} catch (final CmdLineException e) {
System.err.println(e.getMessage());
System.err.println("Usage: hadoop jar HadoopImageIndexer.jar [options]");
parser.printUsage(System.err);
return -1;
}
final Path[] paths = SequenceFileUtility.getFilePaths(input, "part");
final Path outputPath = new Path(output);
if (outputPath.getFileSystem(this.getConf()).exists(outputPath) && replace)
outputPath.getFileSystem(this.getConf()).delete(outputPath, true);
final Job job = TextBytesJobUtil.createJob(paths, outputPath, null, this.getConf());
job.setJarByClass(this.getClass());
job.setMapperClass(PqPcaVladMapper.class);
job.setNumReduceTasks(0);
MultipleOutputs.addNamedOutput(job, "pcavlad", SequenceFileOutputFormat.class, Text.class, BytesWritable.class);
DistributedCache.createSymlink(job.getConfiguration());
DistributedCache.addCacheFile(new URI(indexerData + "#vlad-data.bin"), job.getConfiguration());
SequenceFileOutputFormat.setCompressOutput(job, !dontcompress);
job.waitForCompletion(true);
return 0;
}
代码示例来源:origin: com.expedia.dsp/datasqueeze
/**
* {@inheritDoc}
*/
protected void reduce(final Text key, final Iterable<BytesWritable> values, final Context context) throws IOException, InterruptedException {
final Configuration configuration = context.getConfiguration();
final String sourcePath = configuration.get("compactionSourcePath");
final String targetPath = configuration.get("compactionTargetPath");
// Reducer stores data at the target directory retaining the directory structure of files
String filePath = key.toString().replace(sourcePath, targetPath);
if (key.toString().endsWith("/")) {
filePath = filePath.concat("file");
}
log.info("Compaction output path {}", filePath);
final URI uri = URI.create(filePath);
final MultipleOutputs multipleOutputs = new MultipleOutputs<NullWritable, BytesWritable>(context);
try {
for (final BytesWritable text : values) {
multipleOutputs.write(NullWritable.get(), text, uri.toString());
}
} finally {
multipleOutputs.close();
}
}
}
代码示例来源:origin: apache/incubator-rya
/**
* Set up the MapReduce job to output a schema (TBox).
*/
protected void configureSchemaOutput() {
Path outPath = MRReasoningUtils.getSchemaPath(job.getConfiguration());
SequenceFileOutputFormat.setOutputPath(job, outPath);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(SchemaWritable.class);
LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
MultipleOutputs.addNamedOutput(job, "schemaobj",
SequenceFileOutputFormat.class, NullWritable.class, SchemaWritable.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.DEBUG_OUT,
TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.setCountersEnabled(job, true);
}
代码示例来源:origin: thinkaurelius/faunus
public SafeMapperOutputs(final Mapper.Context context) {
this.context = context;
this.outputs = new MultipleOutputs(this.context);
this.testing = this.context.getConfiguration().getBoolean(FaunusCompiler.TESTING, false);
}
代码示例来源:origin: apache/kylin
final Job job = Job.getInstance(sConf.get());
MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, SequenceFileOutputFormat.class,
NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class,
NullWritable.class, ArrayPrimitiveWritable.class);
MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_STATISTICS, SequenceFileOutputFormat.class,
LongWritable.class, BytesWritable.class);
MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class,
NullWritable.class, LongWritable.class);
multipleOutputsRDD.saveAsNewAPIHadoopDatasetWithMultipleOutputs(job.getConfiguration());
代码示例来源:origin: apache/incubator-rya
/**
* Set up a MapReduce job to output human-readable text.
*/
protected void configureTextOutput(String destination) {
Path outPath;
outPath = MRReasoningUtils.getOutputPath(job.getConfiguration(), destination);
TextOutputFormat.setOutputPath(job, outPath);
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.INTERMEDIATE_OUT,
TextOutputFormat.class, NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.TERMINAL_OUT,
TextOutputFormat.class, NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.SCHEMA_OUT,
TextOutputFormat.class, NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.INCONSISTENT_OUT,
TextOutputFormat.class, NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.DEBUG_OUT,
TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.setCountersEnabled(job, true);
}
内容来源于网络,如有侵权,请联系作者删除!