我有几个csv文件和头总是在文件的第一行。在pig中,将csv文件中的那一行作为字符串输出的最佳方法是什么?不能使用sed、awk等进行预处理。我试过用常规pigstorage和piggy bank csvloader加载文件,但我不清楚如何才能得到第一行。如果需要的话,我愿意写一个自定义项。
xcitsw881#
如果你的csv符合excel2007的csv惯例,你可以使用piggybank已经提供的加载器http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/csvexcelstorage.java?view=markup它有一个跳过csv头的选项 SKIP_INPUT_HEADER
SKIP_INPUT_HEADER
sg3maiej2#
免责声明:我不擅长java。你需要一个自定义项。我不知道你到底要什么,但这个自定义项将采取一系列的csv文件,并把它们变成Map,其中的关键是在文件的顶部的值。这应该是一个足够的 backbone ,这样你就可以把它变成你想要的。我在远程和本地做的两个测试都表明这是可行的。
package myudfs; import java.io.IOException; import org.apache.pig.LoadFunc; import java.util.Map; import java.util.HashMap; import java.util.ArrayList; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.pig.PigException; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; public class ExampleCSVLoader extends LoadFunc { protected RecordReader in = null; private String fieldDel = "" + '\t'; private Map<String, String> outputMap = null; private TupleFactory mTupleFactory = TupleFactory.getInstance(); // This stores the fields that are defined in the first line of the file private ArrayList<Object> topfields = null; public ExampleCSVLoader() {} public ExampleCSVLoader(String delimiter) { this(); this.fieldDel = delimiter; } @Override public Tuple getNext() throws IOException { try { boolean notDone = in.nextKeyValue(); if (!notDone) { outputMap = null; topfields = null; return null; } String value = in.getCurrentValue().toString(); String[] values = value.split(fieldDel); Tuple t = mTupleFactory.newTuple(1); ArrayList<Object> tf = new ArrayList<Object>(); int pos = 0; for (int i = 0; i < values.length; i++) { if (topfields == null) { tf.add(values[i]); } else { readField(values[i], pos); pos = pos + 1; } } if (topfields == null) { topfields = tf; t = mTupleFactory.newTuple(); } else { t.set(0, outputMap); } outputMap = null; return t; } catch (InterruptedException e) { int errCode = 6018; String errMsg = "Error while reading input"; throw new ExecException(errMsg, errCode, PigException.REMOTE_ENVIRONMENT, e); } } // Applies foo to the appropriate value in topfields private void readField(String foo, int pos) { if (outputMap == null) { outputMap = new HashMap<String, String>(); } outputMap.put((String) topfields.get(pos), foo); } @Override public InputFormat getInputFormat() { return new TextInputFormat(); } @Override public void prepareToRead(RecordReader reader, PigSplit split) { in = reader; } @Override public void setLocation(String location, Job job) throws IOException { FileInputFormat.setInputPaths(job, location); } }
加载目录的示例输出:
csv1.in csv2.in ------- --------- A|B|C D|E|F Hello|This|is PLEASE|WORK|FOO FOO|BAR|BING OR|EVERYTHING|WILL BANG|BOSH BE|FOR|NAUGHT
生成此输出:
A: {M: map[]} () ([D#PLEASE,E#WORK,F#FOO]) ([D#OR,E#EVERYTHING,F#WILL]) ([D#BE,E#FOR,F#NAUGHT]) () ([A#Hello,B#This,C#is]) ([A#FOO,B#BAR,C#BING]) ([A#BANG,B#BOSH])
这个 () s是文件的顶行。 getNext() 要求我们返回一些内容,否则文件将停止处理。因此,它们返回一个空模式。
()
getNext()
2条答案
按热度按时间xcitsw881#
如果你的csv符合excel2007的csv惯例,你可以使用piggybank已经提供的加载器http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/csvexcelstorage.java?view=markup
它有一个跳过csv头的选项
SKIP_INPUT_HEADER
sg3maiej2#
免责声明:我不擅长java。
你需要一个自定义项。我不知道你到底要什么,但这个自定义项将采取一系列的csv文件,并把它们变成Map,其中的关键是在文件的顶部的值。这应该是一个足够的 backbone ,这样你就可以把它变成你想要的。
我在远程和本地做的两个测试都表明这是可行的。
加载目录的示例输出:
生成此输出:
这个
()
s是文件的顶行。getNext()
要求我们返回一些内容,否则文件将停止处理。因此,它们返回一个空模式。