我试着运行一个pig脚本,这个脚本调用一个用java编写的用户定义函数。我最终得到java堆空间错误,作业失败。我试过用-xms1024m选项运行这个作业,它对较小的文件运行,但对较大的文件失败。即使我的集群足够强大,不会被这么小的文件绊倒,我想知道如何修复这个内存泄漏。有人能帮忙吗,
import java.util.HashMap;
import java.lang.annotation.Annotation;
import java.lang.reflect.Array;
import java.lang.reflect.Method;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.Set;
import java.text.*;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.*;
import com.tictactec.ta.lib.CoreAnnotated;
import com.tictactec.ta.lib.MAType;
import com.tictactec.ta.lib.MInteger;
import com.tictactec.ta.lib.RetCode;
import com.tictactec.ta.lib.meta.annotation.InputParameterInfo;
import com.tictactec.ta.lib.meta.annotation.InputParameterType;
import com.tictactec.ta.lib.meta.annotation.OptInputParameterInfo;
import com.tictactec.ta.lib.meta.annotation.OptInputParameterType;
import com.tictactec.ta.lib.meta.annotation.OutputParameterInfo;
import com.tictactec.ta.lib.meta.annotation.OutputParameterType;
public class taLib extends EvalFunc<DataBag>
{
private static final int MIN_ARGS = 3;
public static CoreAnnotated core = new CoreAnnotated();
private static Method func_ref = null;
public DecimalFormat df = new DecimalFormat("#.###");
public DataBag exec(Tuple args) throws IOException
{
DataBag input=null;
MInteger outStart = new MInteger();
MInteger outLen = new MInteger();
Map<String,Object>outputParams=new HashMap<String, Object>();
String func_name;
List<Integer> ip_colmns= new ArrayList<Integer>();
List<double[]>ip_list=new ArrayList<double[]>();
List<String>opt_type=new ArrayList<String>();
List<Object>opt_params=new ArrayList<Object>();
//////
long m1=Runtime.getRuntime().freeMemory();
System.out.println(m1);
long m2=Runtime.getRuntime().totalMemory();
System.out.println(m2);
//////
int ip_noofparams=0;
int op_noofparams=0;
int opt_noofparams=0;
if (args == null || args.size() < MIN_ARGS)
throw new IllegalArgumentException("talib: must have at least " +
MIN_ARGS + " args");
if(args.get(0) instanceof DataBag)
{input = (DataBag)args.get(0);}
else{throw new IllegalArgumentException("Only a valid bag name can be
passed");}
// get no of fields in bag
Tuple t0=input.iterator().next();
int fields_in_bag=t0.getAll().size();
if(args.get(1) instanceof String)
{func_name = (String)args.get(1);}
else{throw new IllegalArgumentException("Only valid function name can be
passed at arg 1");}
func_ref=methodChk(func_name);
if (func_ref == null) {
throw new IllegalArgumentException("talib: function "
+ func_name + " was not found");
}
for (Annotation[] annotations : func_ref.getParameterAnnotations())
{
for (Annotation annotation : annotations)
{
if(annotation instanceof InputParameterInfo)
{
InputParameterInfo inputParameterInfo =
(InputParameterInfo)annotation;
if(inputParameterInfo.type().equals(InputParameterType.TA_Input_Price))
{
ip_noofparams=numberOfSetBits(inputParameterInfo.flags());
}
else
{
ip_noofparams++;
}
}
if(annotation instanceof OptInputParameterInfo)
{
OptInputParameterInfo optinputParameterInfo=
(OptInputParameterInfo)annotation;
opt_noofparams++;
if
(optinputParameterInfo.type().equals(OptInputParameterType.TA_OptInput_IntegerRange))
{
opt_type.add("Integer");
}
else
if(optinputParameterInfo.type().equals(OptInputParameterType.TA_OptInput_RealRange))
{
opt_type.add("Double");
}
else
if(optinputParameterInfo.type().equals(OptInputParameterType.TA_OptInput_IntegerList))
{
opt_type.add("String");
}
else{throw new IllegalArgumentException("whoopsie ...serious
mess in opt_annotations");}
}
if (annotation instanceof OutputParameterInfo)
{
OutputParameterInfo outputParameterInfo =
(OutputParameterInfo) annotation;
op_noofparams++;
if
(outputParameterInfo.type().equals(OutputParameterType.TA_Output_Real))
{
outputParams.put(outputParameterInfo.paramName(), new
double[(int) input.size()]);
}
else if
(outputParameterInfo.type().equals(OutputParameterType.TA_Output_Integer))
{
outputParams.put(outputParameterInfo.paramName(), new
int[(int)input.size()]);
}
}
}
}
int total_params =ip_noofparams+opt_noofparams;
if((args.size()-2)!=total_params){throw new IllegalArgumentException("Wrong
no of argumets passed to UDF");}
// get the ip colmns no's
for(int i=2;i<(2+ip_noofparams);i++)
{
if(args.get(i) instanceof Integer )
{
if((Integer)args.get(i)>=0 && (Integer)args.get(i)<fields_in_bag)
{
ip_colmns.add((Integer) args.get(i));
}
else{throw new IllegalArgumentException("The input colmn specified
is invalid..please enter a valid colmn no:0-"+(fields_in_bag-1));}
}
else{throw new IllegalArgumentException("Wrong arguments entered:
Only"+ip_noofparams+"field no's of type(integer) allowed for fn"+func_name ); }
}
// create a list of ip arrays
for(int i=0;i<ip_colmns.size();i++)
{
ip_list.add((double[]) Array.newInstance(double.class, (int)input.size()));
}
int z=0;
int x=0;
// fill up the arrays
for(Tuple t1: input)
{
Iterator<double[]> itr=ip_list.iterator();
z=0;
while(itr.hasNext())
{
if((Double)t1.get(ip_colmns.get(z)) instanceof Double)
{
((double[])itr.next())[x]=(Double) t1.get(ip_colmns.get(z++));
}
else{throw new IllegalArgumentException("Illegal argument while
filling up array...only double typr allowed");}
}
x++;
}
//deal with opt params
int s=0;
for(int i=(2+ip_noofparams);i<(2+ip_noofparams+opt_noofparams);i++)
{
if(opt_type.get(s).equalsIgnoreCase(args.get(i).getClass().getSimpleName().toString()))
{
if(opt_type.get(s).equalsIgnoreCase("String"))
{
String m=args.get(i).toString().toLowerCase();
String ma=m.substring(0, 1).toUpperCase();
String mac=m.substring(1);
String macd=ma+mac;
MAType type =MAType.valueOf(macd);
opt_params.add(type);
s++;
}
else{
opt_params.add(args.get(i));
s++;
}
}
else if(opt_type.get(s).equalsIgnoreCase("Double"))
{
if(args.get(i).getClass().getSimpleName().toString().equalsIgnoreCase("Integer"))
{
opt_params.add((Double)((Integer)args.get(i)+0.0));
s++;
}
else{throw new IllegalArgumentException("Opt arguments do
not match for fn:"+func_name+", pls enter opt arguments in right order"); }
}
else{throw new IllegalArgumentException("Opt arguments do not match
for fn:"+func_name+", pls enter opt arguments in right order");}
}
List<Object> ta_argl = new ArrayList<Object>();
ta_argl.add(new Integer(0));
ta_argl.add(new Integer((int)input.size() - 1));
for(double[]in: ip_list)
{
ta_argl.add(in);
}
if(opt_noofparams!=0)
{ta_argl.addAll(opt_params);}
ta_argl.add(outStart);
ta_argl.add(outLen);
for (Map.Entry<String, Object> entry : outputParams.entrySet())
{
ta_argl.add(entry.getValue());
}
RetCode rc = RetCode.Success;
try {
rc = (RetCode)func_ref.invoke(core, ta_argl.toArray());
} catch (Exception e)
{
assert false : "I died in ta-lib, but Java made me a zombie...";
}
assert rc == RetCode.Success : "ret code from " + func_name;
if (outLen.value == 0) return null;
//////
DataBag ret=null;
ret =outTA(input,outputParams,outStart);
outputParams.clear();
ip_list.clear();
opt_params.clear();
opt_type.clear();
ip_colmns.clear();
Runtime.getRuntime().gc();
return ret;
}
public DataBag outTA(DataBag bag,Map<String, Object> outputParams,MInteger outStart)
{
DataBag nbag=null;
TupleFactory mTupleFactory=TupleFactory.getInstance();
BagFactory mBagFactory=BagFactory.getInstance();
nbag=mBagFactory.newDefaultBag();
Tuple tw=bag.iterator().next();
int fieldsintup=tw.getAll().size();
for(Tuple t0: bag)
{
Tuple t1=mTupleFactory.newTuple();
for(int z=0;z<fieldsintup;z++)
{
try {
t1.append(t0.get(z));
} catch (Exception e) {
// TODO Auto-generated catch block
System.out.println("Ouch");
}
}
nbag.add(t1);
}
int i = 0;
int j=0;
for (Tuple t2: nbag)
{
if(i>=outStart.value)
{
for(Map.Entry<String,Object>entry: outputParams.entrySet())
{
t2.append(entry.getKey().substring(3).toString());
if(entry.getValue() instanceof double[])
{
t2.append( new Double
(df.format(((double[])entry.getValue())[j])));
}
else if(entry.getValue() instanceof int[])
{
t2.append( ((int[])entry.getValue())[j]);
}
else{throw new
IllegalArgumentException(entry.getValue().getClass()+"not supported");}
}
i++;j++;
}
else
{t2.append(0.0);
i++;
}
}
return nbag;
}
public Method methodChk(String fn)
{
String fn_name=fn;
Method tmp_fn=null;
for (Method meth: core.getClass().getDeclaredMethods())
{
if (meth.getName().equalsIgnoreCase(fn_name))
{
tmp_fn = meth;
break;
}
}
return tmp_fn;
}
public int numberOfSetBits(int i) {
i = i - ((i >> 1) & 0x55555555);
i = (i & 0x33333333) + ((i >> 2) & 0x33333333);
return ((i + (i >> 4) & 0xF0F0F0F) * 0x1010101) >> 24;
}
}
1条答案
按热度按时间xqkwcwgp1#
可能是bzip编解码器有问题-api确实注意到它非常需要内存:
http://hadoop.apache.org/common/docs/r0.20.0/api/org/apache/hadoop/io/compress/bzip2/cbzip2outputstream.html
压缩需要大量内存
当你用
-Xms2048m
你为Pig咕噜壳,还是Map/减少作业设置了选项?您可以通过查看jobtracker进行检查,找到失败的作业,打开job.xml并找到
mapred.child.java.opts