我在练ApachePig。使用define和stream操作符,我想使用python脚本流化一个文件,并获得一些编辑输出。
Below is the file I am using.
[cloudera@localhost ~]$ cat data/movies_data.csv
1,The Nightmare Before Christmas,1993,3.9,4568
2,The Mummy,1932,3.5,4388
3,Orphans of the Storm,1921,3.2,9062
4,The Object of Beauty,1991,2.8,6150
5,Night Tide,1963,2.8,5126
6,One Magic Christmas,1985,3.8,5333
7,Muriels Wedding,1994,3.5,6323
8,Mothers Boys,1994,3.4,5733
9,Nosferatu Original Version,1929,3.5,5651
10,Nick of Time,1995,3.4,5333
我期望pig使用python的输出是第一个字段值乘以10,第二个字段数据转换为大写,第三个字段年份增加1。
预期样本输出:
10,THE NIGHTMARE BEFORE CHRISTMAS,1994,3.9,4568
20,THE MUMMY,1933,3.5,4388
我在中使用的python代码
[cloudera@localhost ~]$cat testpy22.py
# !/usr/bin/python
import sys
import string
for line in sys.stdin:
(f1,f2,f3,f4,f5)=str(line).strip().split(",")
f1 = f1*10
f2 = f2.upper()
f3 = f3+1
print"%d\t%s\t%d\t%.2f\t%d"%(f1,f2,f3,f4,f5)
下面是我正在尝试的Pig代码:
grunt> a = load '/home/cloudera/data/movies_data.csv' using PigStorage(',') as (id:chararray,movie:chararray,year:chararray, point:chararray,code:chararray);
grunt> dump a;
Output(s):
Successfully stored records in: "file:/tmp/temp-947273140/tmp1180787799"
Job DAG:
job_local1521960706_0008
2017-07-15 04:56:26,250 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
2017-07-15 04:56:26,250 [main] WARN org.apache.pig.data.SchemaTupleBackend - SchemaTupleBackend has already been initialized
2017-07-15 04:56:26,251 [main] INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1
2017-07-15 04:56:26,251 [main] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1
(1,The Nightmare Before Christmas,1993,3.9,4568)
(2,The Mummy,1932,3.5,4388)
(3,Orphans of the Storm,1921,3.2,9062)
(4,The Object of Beauty,1991,2.8,6150)
(5,Night Tide,1963,2.8,5126)
(6,One Magic Christmas,1985,3.8,5333)
(7,Muriels Wedding,1994,3.5,6323)
(8,Mothers Boys,1994,3.4,5733)
(9,Nosferatu Original Version,1929,3.5,5651)
(10,Nick of Time,1995,3.4,5333)
grunt> DEFINE testpy22 `testpy22.py` SHIP('/home/cloudera/testpy22.py');
grunt> aaa = STREAM a through testpy22;
grunt> dump aaa;
当我转储数据时,出现以下错误。我假设错误是由python代码引起的。但我找不到问题所在。
2017-07-15 04:58:37,718 [pool-9-thread-1] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigRecordReader - Current split being processed file:/home/cloudera/data/movies_data.csv:0+344
2017-07-15 04:58:37,736 [pool-9-thread-1] WARN org.apache.hadoop.conf.Configuration - dfs.https.address is deprecated. Instead, use dfs.namenode.https-address
2017-07-15 04:58:37,755 [pool-9-thread-1] INFO org.apache.pig.data.SchemaTupleBackend - Key [pig.schematuple] was not set... will not generate code.
2017-07-15 04:58:37,787 [pool-9-thread-1] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapOnly$Map - Aliases being processed per job phase (AliasName[line,offset]): M: a[6,4],a[-1,-1],aaa[10,6] C: R:
===== Task Information Header =====
Command: testpy22.py (stdin-org.apache.pig.builtin.PigStreaming/stdout-org.apache.pig.builtin.PigStreaming)
Start time: Sat Jul 15 04:58:37 PDT 2017
Input-split file: file:/home/cloudera/data/movies_data.csv
Input-split start-offset: 0
Input-split length: 344
===== * * * =====
2017-07-15 04:58:37,855 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - HadoopJobId: job_local1407418523_0009
2017-07-15 04:58:37,855 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Processing aliases a,aaa
2017-07-15 04:58:37,855 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - detailed locations: M: a[6,4],a[-1,-1],aaa[10,6] C: R:
2017-07-15 04:58:37,857 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 0% complete
Traceback (most recent call last):
File "/home/cloudera/testpy22.py", line 7, in <module>
f1,f2,f3,f4,f5=str(line).strip().split(",")
ValueError: need more than 1 value to unpack
2017-07-15 04:58:37,913 [Thread-98] ERROR org.apache.pig.impl.streaming.ExecutableManager - 'testpy22.py ' failed with exit status: 1
2017-07-15 04:58:37,914 [Thread-94] INFO org.apache.hadoop.mapred.LocalJobRunner - Map task executor complete.
2017-07-15 04:58:37,917 [Thread-99] ERROR org.apache.pig.impl.streaming.ExecutableManager - testpy22.py (stdin-org.apache.pig.builtin.PigStreaming/stdout-org.apache.pig.builtin.PigStreaming) failed with exit status: 1
===== Task Information Footer =====
End time: Sat Jul 15 04:58:37 PDT 2017
Exit code: 1
Input records: 10
Input bytes: 3568 bytes (stdin using org.apache.pig.builtin.PigStreaming)
Output records: 0
Output bytes: 0 bytes (stdout using org.apache.pig.builtin.PigStreaming)
===== * * * =====
2017-07-15 04:58:37,921 [Thread-94] WARN org.apache.hadoop.mapred.LocalJobRunner - job_local1407418523_0009
java.lang.Exception: org.apache.pig.backend.executionengine.ExecException: ERROR 2055: Received Error while processing the map plan: 'testpy22.py ' failed with exit status: 1
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:406)
Caused by: org.apache.pig.backend.executionengine.ExecException: ERROR 2055: Received Error while processing the map plan: 'testpy22.py ' failed with exit status: 1
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.runPipeline(PigGenericMapBase.java:311)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.cleanup(PigGenericMapBase.java:124)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:142)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:268)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
2017-07-15 04:58:42,875 [main] WARN org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Ooops! Some job has failed! Specify -stop_on_failure if you want Pig to stop immediately on failure.
2017-07-15 04:58:42,877 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - job job_local1407418523_0009 has failed! Stop running all dependent jobs
2017-07-15 04:58:42,877 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 100% complete
2017-07-15 04:58:42,878 [main] ERROR org.apache.pig.tools.pigstats.PigStatsUtil - 1 map reduce job(s) failed!
2017-07-15 04:58:42,878 [main] INFO org.apache.pig.tools.pigstats.SimplePigStats - Detected Local mode. Stats reported below may be incomplete
2017-07-15 04:58:42,879 [main] INFO org.apache.pig.tools.pigstats.SimplePigStats - Script Statistics:
HadoopVersion PigVersion UserId StartedAt FinishedAt Features
2.0.0-cdh4.7.0 0.11.0-cdh4.7.0 cloudera 2017-07-15 04:58:37 2017-07-15 04:58:42 STREAMING
Failed!
Failed Jobs:
JobId Alias Feature Message Outputs
job_local1407418523_0009 a,aaa STREAMING,MAP_ONLY Message: Job failed! file:/tmp/temp-947273140/tmp1217312985,
Input(s):
Failed to read data from "/home/cloudera/data/movies_data.csv"
Output(s):
Failed to produce result in "file:/tmp/temp-947273140/tmp1217312985"
Job DAG:
job_local1407418523_0009
2017-07-15 04:58:42,879 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Failed!
2017-07-15 04:58:42,881 [main] ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1066: Unable to open iterator for alias aaa
Details at logfile: /home/cloudera/pig_1500117064292.log
grunt> 2017-07-15 04:58:43,685 [communication thread] INFO org.apache.hadoop.mapred.LocalJobRunner -
有人能给我什么建议吗?
python版本:2.6.6 apache pig版本:apache pig版本0.11.0-cdh4.7.0
1条答案
按热度按时间n3ipq98p1#
您遇到的问题是,当您将列表直接解包为一系列值时,例如python的“tuple unpacking”特性:
如果split(“,”)函数返回5个变量以外的任何变量,python将抛出一个错误。我猜你是在空行。。。
valueerror:需要多个值才能解包
一定要将python升级到2.7,但这是核心功能,版本不会影响您。