我有一个小的pig脚本,其中我使用最近引入的streamingudf功能调用python udf:
REGISTER 'process_tweet.py' USING streaming_python AS process_tweet;
REGISTER /usr/lib/hbase/lib/*.jar
tweets = LOAD 'hbase://brand_tweets' USING org.apache.pig.backend.hadoop.hbase.HBaseStorage('data:json') AS (json:chararray);
tweets = LIMIT tweets 100;
tweets = foreach tweets generate flatten(process_tweet.extract(json)) as (userid:long, text: chararray);
dump tweets;
python函数process\u tweet.extract基本上只是反序列化一个json对象(tweet),并返回它的一些值。
from pig_util import outputSchema
import json
@outputSchema("(userid:long, text:chararray)")
def extract(tweet):
content = json.loads(tweet)
return long(content['user']['id']), content['text']
在本地模式(pig-x local)下执行时,脚本运行时不会出错并返回预期的输出。但是,在mapreduce模式下,作业运行需要很长时间才能失败,错误消息如下:
Backend error message
---------------------
AttemptID:attempt_1394221905204_0172_r_000000_1 Info:Error: org.apache.pig.backend.executionengine.ExecException: ERROR 0: Exception while executing [POUserFunc (Name: POUserFunc(org.apache.pig.impl.builtin.StreamingUDF)[tuple] - scope-7 Operator Key: scope-7) children: null at []]: java.lang.NullPointerException
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.getNext(PhysicalOperator.java:338)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:378)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNextTuple(POForEach.java:298)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.runPipeline(PigGenericMapReduce.java:464)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.processOnePackageOutput(PigGenericMapReduce.java:432)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.reduce(PigGenericMapReduce.java:412)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.reduce(PigGenericMapReduce.java:256)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:645)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:405)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
Caused by: java.lang.NullPointerException
at org.apache.pig.impl.builtin.StreamingUDF.getControllerPath(StreamingUDF.java:268)
at org.apache.pig.impl.builtin.StreamingUDF.constructCommand(StreamingUDF.java:199)
at org.apache.pig.impl.builtin.StreamingUDF.startUdfController(StreamingUDF.java:163)
at org.apache.pig.impl.builtin.StreamingUDF.initialize(StreamingUDF.java:156)
at org.apache.pig.impl.builtin.StreamingUDF.exec(StreamingUDF.java:146)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNext(POUserFunc.java:330)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNextTuple(POUserFunc.java:369)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.getNext(PhysicalOperator.java:333)
... 14 more
1条答案
按热度按时间vd2z7a6w1#
事实证明,streamingudf在hadoop2上不受支持。
https://issues.apache.org/jira/browse/pig-3478