pig udf抛出错误

y1aodyip  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(350)

我收到一个Pig脚本错误。
Pig脚本:

REGISTER /var/lib/hadoop-hdfs/udf.jar;

REGISTER /var/lib/hadoop-hdfs/udf2.jar;

INPUT_LINES = Load 'hdfs:/Inputdata/DATA_GOV_US_Farmers_Market_DataSet.csv' using PigStorage(',') AS (FMID:chararray, MarketName:chararray, Website:chararray, Street:chararray, City:chararray, County:chararray, State:chararray, Zip:chararray, Schedule:chararray, X:chararray, Y:chararray, Location:chararray, Credit:chararray, WIC:chararray, WICcash:chararray, SFMNP:chararray, SNAP:chararray, Bakedgoods:chararray, Cheese:chararray, Crafts:chararray, Flowers:chararray, Eggs:chararray, Seafood:chararray, Herbs:chararray, Vegetables:chararray, Honey:chararray, Jams:chararray, Maple:chararray, Meat:chararray, Nursery:chararray, Nuts:chararray, Plants:chararray, Poultry:chararray, Prepared:chararray, Soap:chararray, Trees:chararray, Wine:chararray);

FILTERED_COUNTY = FILTER INPUT_LINES BY County=='Los Angeles';

REQUIRED_COLUMNS = FOREACH FILTERED_COUNTY GENERATE FMID,MarketName,$12..;

PER = FOREACH REQUIRED_COLUMNS GENERATE FMID,MarketName,fm($2..) AS Percentage;

STATUS = FOREACH PER GENERATE FMID,MarketName,Percentage,status(Percentage) AS Stat;

自定义项1:

import java.io.IOException;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
public class fm extends EvalFunc<Integer>
{
    String temp;
    int per;
    int count=0;
public Integer exec(Tuple input) throws IOException {
if (input == null || input.size() == 0)
return -1;
try
{
    for(int i=0;i<25;i++)
    {
        if(input.get(i) == "" || input.get(i) == null)
            return -1;

        temp = (String)input.get(i);
        if(temp.equals("Y"))
            count++;
    }
    per =  count*4;
    count = 0;
    return per;
}
catch(Exception e)
{
throw new IOException("Caught exception processing input row ", e);
}
}
}

udf2:

import java.io.IOException;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
public class status extends EvalFunc<String>
{
public String exec(Tuple input) throws IOException 
{
if (input == null || input.size() == 0)
return null;
try
{
String str = (String)input.get(0);
int i = Integer.parseInt(str);
if(i>=60)
    return "HIGH";
else if(i<=40)
    return "LOW";
else
    return "MEDIUM";
}
catch(Exception e)
{
throw new IOException("Caught exception processing input row ", e);
}
}
}

数据集:
https://onedrive.live.com/redir?resid=7f81451078f4dbe8%21113
错误:

清管器堆放痕迹

ERROR 2078: Caught error from UDF: status [Caught exception processing input row ]

org.apache.pig.impl.logicalLayer.FrontendException: ERROR 1066: Unable to open iterator for alias STATUS. Backend error : Caught error from UDF: status [Caught exception processing input row ]
    at org.apache.pig.PigServer.openIterator(PigServer.java:828)
    at org.apache.pig.tools.grunt.GruntParser.processDump(GruntParser.java:696)
    at org.apache.pig.tools.pigscript.parser.PigScriptParser.parse(PigScriptParser.java:320)
    at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:194)
    at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:170)
    at org.apache.pig.tools.grunt.Grunt.run(Grunt.java:69)
    at org.apache.pig.Main.run(Main.java:538)
    at org.apache.pig.Main.main(Main.java:157)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:208)
Caused by: org.apache.pig.backend.executionengine.ExecException: ERROR 2078: Caught error from UDF: status [Caught exception processing input row ]
    at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNext(POUserFunc.java:365)
    at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNext(POUserFunc.java:434)
    at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.getNext(PhysicalOperator.java:340)
    at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:372)
    at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNext(POForEach.java:297)
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.runPipeline(PigGenericMapBase.java:283)
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:278)
qvtsj1bj

qvtsj1bj1#

似乎您的问题可能是您正在将输入强制转换为状态自定义项中的字符串。您的fm udf实际上返回一个整数。所以你应该:

Integer i = (Integer)input.get(0);

这肯定会引起问题,除非你解决它。没有最初的错误消息,我不能说是否有一些其他的问题,早些时候发生。
我希望您的堆栈跟踪包含原始异常消息,这将帮助您调试此问题。奇怪的是它没有。没有它,你所要做的就是分析代码。
这可能有助于将来的调试:

throw new IOException("Caught exception processing input row " + e.getMessage(), e);

对于fmudf,我还建议将变量temp、per和count设置为exec方法的本地变量,而不是类的示例,因为它们不需要设置为本地变量。这可能不会导致错误,但这是更好的编码实践。

相关问题