因此,在我的reducer中,我使用multipleoutput在hdfs中写入不同的文件(下面是完整的reducer代码)
我想使用mrunit(需要1.1来支持multipleoutputs)对此进行单元测试,下面是我的测试方法
@Test
public void testParseValidInput() throws IOException {
List values = new ArrayList();
values.add(new Text("{}"));
reduceDriver.withInput(new Text("aProjectId/2014-04-22T15:42:27.000Z"), values)
.withMultiOutput(null, new Text("{}"), new Text("aProjectId/2014/04/22/15/42"))
.runTest();
}
然而,当我运行这个测试时,我得到了一个npe
java.lang.NullPointerException
at org.apache.hadoop.fs.Path.(Path.java:105)
at org.apache.hadoop.fs.Path.(Path.java:94)
at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getDefaultWorkFile(FileOutputFormat.java:286)
at org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.getRecordWriter(TextOutputFormat.java:129)
at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.getRecordWriter(MultipleOutputs.java:475)
at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.write(MultipleOutputs.java:457)
at com.here.hac.partition.HacDataParReducer.reduce(HacDataParReducer.java:71)
at com.here.hac.partition.HacDataParReducer.reduce(HacDataParReducer.java:1)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
at org.apache.hadoop.mrunit.mapreduce.ReduceDriver.run(ReduceDriver.java:268)
at org.apache.hadoop.mrunit.TestDriver.runTest(TestDriver.java:625)
at org.apache.hadoop.mrunit.TestDriver.runTest(TestDriver.java:612)
at com.here.hac.partition.HacDataParReducerTest.test2(HacDataParReducerTest.java:49)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
at org.junit.runners.BlockJUnit4ClassRunner.runNotIgnored(BlockJUnit4ClassRunner.java:79)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:71)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:49)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:191)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:184)
at org.junit.runners.ParentRunner.run(ParentRunner.java:236)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
这是减速机代码
package com.here.hac.partition;
import java.io.IOException;
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
/***Reducer class writes output data to multiple files based on * partition rules. / public class HacDataParReducer extends Reducer{ private MultipleOutputs mos; private Long currentTimeInSeconds = System.currentTimeMillis()/1000; /* * Return a file name "path/to/file" based on given (key,value) pair. * E.g. for a give key which is in the format of "/", * return file name "/////-" * * @param Text key * @param Text value * @return String File name string. / private String generateFileName(Text key, Text value){ String[] parseDate = key.toString().split(":"); return buildPath(parseDate[0]) + "/" + parseDate[1] +"-"+ currentTimeInSeconds; } /* * Replace '-' and 'T' in a string with '/'. * * E.g. For a input string format as "4spoo7qlgm/2014-04-22T15", return "4spoo7qlgm/2014/04/22/15" * * @param String str * @return String / private String buildPath(String str){ StringBuffer newPath = new StringBuffer(); for(char c : str.toCharArray()){ if(c == '-' || c == 'T') newPath.append('/'); else newPath.append(c); } return newPath.toString(); } @Override /* * Setup data structure needed within the same Reduce Task. */ public void setup(Context context){ mos = new MultipleOutputs(context); } @Override public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { this.currentTimeInSeconds = System.currentTimeMillis()/1000; for (Text val : values) { String fileName = generateFileName(key, val); mos.write((Text)null, val, fileName); } }
@Override
/**
* Clean up any data structure that is initialized in the setup process.
*/
public void cleanup(Context context) throws IOException, InterruptedException {
mos.close();
}
}
有什么建议吗?
暂无答案!
目前还没有任何答案,快来回答吧!