我编写了一个udf,它读取一个输入文件,并将数据分为string和integer或string和double。
我的自定基金运作良好。我还编写了一个pig脚本,在hdfs上使用上述jar。
现在我想把这段代码与talend进行大数据集成。我怎么能做到这一点。
udf中的java代码如下:
package com.test.udf;
import java.io.IOException;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
public class CheckDataType extends EvalFunc<Tuple> {
@Override
public Tuple exec(Tuple input) throws IOException {
// TODO Auto-generated method stub
String valString = null;
Integer valInt = null;
Double valDouble =null;
String str = (String) input.get(0);
Tuple outputTuple =TupleFactory.getInstance().newTuple(2);
if (str != null){
try{
valInt = Integer.parseInt(str);
outputTuple.set(0, valString);
outputTuple.set(1, valInt);
}
catch(Exception e){
try{
valDouble = Double.parseDouble(str) ;
outputTuple.set(0, valString);
outputTuple.set(1, valDouble);
}
catch(Exception ew){
outputTuple.set(0, str);
outputTuple.set(1, null);
}
}
}
return outputTuple;
}
}
另外,我写的Pig剧本如下:
REGISTER 'CONVERT.jar';
data_load = LOAD '/tmp/input/testfile.txt' USING PigStorage(',') AS (col1:chararray, col2:chararray, col3:chararray, col4:chararray, col5:chararray);
data_grp = GROUP data_load BY ($input_col);
data_flatten = FOREACH data_grp GENERATE FLATTEN(com.test.udf.CheckDataType(*));
rmf /tmp/output;
STORE data_flatten INTO '/tmp/output' USING PigStorage(',');
我如何在talend中集成这些数据以获取大数据。
1条答案
按热度按时间eaf3rand1#
最新答案:
您需要将pig脚本分为3个组件:pigload、pigcode和pigstoresult,并将它们连接起来。udf可以作为代码或单独的jar包含在pigload组件中。
逐步说明可在以下位置找到:https://www.evernote.com/l/ajonexs0_sbnwpdfmpbyjsuvs0vmas04egm