我尝试在pig中执行以下步骤1到步骤4:
STEP 1:- Create a user table:and take data from /tmp/users.txt-
|Column 1 | USER ID |int|
|Column 2 |EMAIL|chararray|
|Column 3 |LANGUAGE |chararray|
|Column 4 |LOCATION |chararray|
STEP 2:- Crate a transaction table and take data from /tmp/transaction.txt:-
|Column 1 | ID |int|
|Column 2 |PRODUCT|int|
|Column 3 |USER ID |int|
|Column 4 |PURCHASE AMOUNT |double|
|Coulmn 5 |DESCRIPTION |chararray|
Step 3:- Find out the count of each product in distinctive Locations.
Step 4:- Display the results.
为了实现上述目标,我做了以下工作:
users = LOAD '/tmp/users.txt' USING PigStorage(',') AS (USERID:int, EMAIL:chararray, LANGUAGE:chararray, LOCATION: chararray);
trans = LOAD '/tmp/transaction.txt' USING PigStorage(',') AS (ID:int, PRODUCT:int, USERID:int, PURCHASEAMOUNT: double, DESCRIPTION: chararray);
users_trans = JOIN users BY USERID RIGHT, trans BY USERID;
B = GROUP users_trans BY (DESCRIPTION,LOCATION);
C = FOREACH B GENERATE group as comb, COUNT(users_trans) AS Total;
DUMP C;
但是,我有错误。。如果你能帮忙的话会很有帮助的,因为我是新手。
##########################################
Dataset
user.txt
1 creator@gmail.com EN US
2 creator@gmail.com EN GB
3 creator@gmail.com FR FR
4 creator@gmail.com IN HN
5 creator@gmail.com PAK IS
transaction.txt
1 1 1 300 a jumper
2 1 2 300 a jumper
3 1 5 300 a jumper
4 2 3 100 a rubber chicken
5 1 3 300 a jumper
6 5 4 500 a soapbox
7 3 3 200 a adhesive
8 4 1 300 a lotion
9 4 4 500 a sweater
10 5 4 600 a jeans
Error Log:
2019-12-27 06:17:22,180 [LocalJobRunner Map Task Executor #0] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigRecordReader - Current split being processed file:/tmp/temp2029752934/tmp-883821114/part-r-00000:0+130
2019-12-27 06:17:22,242 [LocalJobRunner Map Task Executor #0] INFO org.apache.hadoop.mapred.MapTask - (EQUATOR) 0 kvi 26214396(104857584)
2019-12-27 06:17:22,242 [LocalJobRunner Map Task Executor #0] INFO org.apache.hadoop.mapred.MapTask - mapreduce.task.io.sort.mb: 100
2019-12-27 06:17:22,242 [LocalJobRunner Map Task Executor #0] INFO org.apache.hadoop.mapred.MapTask - soft limit at 83886080
2019-12-27 06:17:22,242 [LocalJobRunner Map Task Executor #0] INFO org.apache.hadoop.mapred.MapTask - bufstart = 0; bufvoid = 104857600
2019-12-27 06:17:22,242 [LocalJobRunner Map Task Executor #0] INFO org.apache.hadoop.mapred.MapTask - kvstart = 26214396; length = 6553600
2019-12-27 06:17:22,244 [LocalJobRunner Map Task Executor #0] INFO org.apache.hadoop.mapred.MapTask - Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
2019-12-27 06:17:22,248 [LocalJobRunner Map Task Executor #0] INFO org.apache.pig.impl.util.SpillableMemoryManager - Selected heap (PS Old Gen) of size 699400192 to monitor. collectionUsageThreshold = 489580128, usageThreshold = 489580128
2019-12-27 06:17:22,248 [LocalJobRunner Map Task Executor #0] WARN org.apache.pig.data.SchemaTupleBackend - SchemaTupleBackend has already been initialized
2019-12-27 06:17:22,250 [LocalJobRunner Map Task Executor #0] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Map - Aliases being processed per job phase (AliasName[line,offset]): M: C[7,4],B[6,4] C: C[7,4],B[6,4] R: C[7,4]
2019-12-27 06:17:22,254 [LocalJobRunner Map Task Executor #0] INFO org.apache.hadoop.mapred.LocalJobRunner -
2019-12-27 06:17:22,254 [LocalJobRunner Map Task Executor #0] INFO org.apache.hadoop.mapred.MapTask - Starting flush of map output
2019-12-27 06:17:22,254 [LocalJobRunner Map Task Executor #0] INFO org.apache.hadoop.mapred.MapTask - Spilling map output
2019-12-27 06:17:22,254 [LocalJobRunner Map Task Executor #0] INFO org.apache.hadoop.mapred.MapTask - bufstart = 0; bufend = 100; bufvoid = 104857600
2019-12-27 06:17:22,254 [LocalJobRunner Map Task Executor #0] INFO org.apache.hadoop.mapred.MapTask - kvstart = 26214396(104857584); kvend = 26214360(104857440); length = 37/6553600
2019-12-27 06:17:22,262 [LocalJobRunner Map Task Executor #0] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigCombiner$Combine - Aliases being processed per job phase (AliasName[line,offset]): M: C[7,4],B[6,4] C: C[7,4],B[6,4] R: C[7,4]
2019-12-27 06:17:22,264 [LocalJobRunner Map Task Executor #0] INFO org.apache.hadoop.mapred.MapTask - Finished spill 0
2019-12-27 06:17:22,265 [LocalJobRunner Map Task Executor #0] INFO org.apache.hadoop.mapred.Task - Task:attempt_local1424814286_0002_m_000000_0 is done. And is in the process of committing
2019-12-27 06:17:22,266 [LocalJobRunner Map Task Executor #0] INFO org.apache.hadoop.mapred.LocalJobRunner -map
2019-12-27 06:17:22,266 [LocalJobRunner Map Task Executor #0] INFO org.apache.hadoop.mapred.Task - Task 'attempt_local1424814286_0002_m_000000_0' done.
2019-12-27 06:17:22,266 [LocalJobRunner Map Task Executor #0] INFO org.apache.hadoop.mapred.LocalJobRunner -Finishing task: attempt_local1424814286_0002_m_000000_0
2019-12-27 06:17:22,266 [Thread-18] INFO org.apache.hadoop.mapred.LocalJobRunner - map task executor complete.
2019-12-27 06:17:22,266 [Thread-18] INFO org.apache.hadoop.mapred.LocalJobRunner - Waiting for reduce tasks
2019-12-27 06:17:22,267 [pool-9-thread-1] INFO org.apache.hadoop.mapred.LocalJobRunner - Starting task: attempt_local1424814286_0002_r_000000_0
2019-12-27 06:17:22,272 [pool-9-thread-1] INFO org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter - File Output Committer Algorithm version is 1
2019-12-27 06:17:22,272 [pool-9-thread-1] INFO org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter - FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
2019-12-27 06:17:22,274 [pool-9-thread-1] INFO org.apache.hadoop.mapred.Task - Using ResourceCalculatorProcessTree : [ ]
2019-12-27 06:17:22,274 [pool-9-thread-1] INFO org.apache.hadoop.mapred.ReduceTask - Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@2582aa54
2019-12-27 06:17:22,275 [pool-9-thread-1] INFO org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl - MergerManager: memoryLimit=652528832, maxSingleShuffleLimit=163132208, mergeThreshold=430669056, ioSortFactor=10, memToMemMergeOutputsThreshold=10
2019-12-27 06:17:22,275 [EventFetcher for fetching Map Completion Events] INFO org.apache.hadoop.mapreduce.task.reduce.EventFetcher - attempt_local1424814286_0002_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
2019-12-27 06:17:22,276 [localfetcher#2] INFO org.apache.hadoop.mapreduce.task.reduce.LocalFetcher - localfetcher#2 about to shuffle output of map attempt_local1424814286_0002_m_000000_0 decomp: 14 len: 18 to MEMORY
2019-12-27 06:17:22,277 [localfetcher#2] INFO org.apache.hadoop.mapreduce.task.reduce.InMemoryMapOutput - Read 14 bytes from map-output for attempt_local1424814286_0002_m_000000_0
2019-12-27 06:17:22,277 [localfetcher#2] INFO org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl - closeInMemoryFile -> map-output of size: 14, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->14
2019-12-27 06:17:22,277 [EventFetcher for fetching Map Completion Events] INFO org.apache.hadoop.mapreduce.task.reduce.EventFetcher - EventFetcher is interrupted.. Returning
2019-12-27 06:17:22,278 [Readahead Thread #3] WARN org.apache.hadoop.io.ReadaheadPool - Failed readahead on ifile
EBADF: Bad file descriptor
at org.apache.hadoop.io.nativeio.NativeIO$POSIX.posix_fadvise(Native Method)
at org.apache.hadoop.io.nativeio.NativeIO$POSIX.posixFadviseIfPossible(NativeIO.java:267)
at org.apache.hadoop.io.nativeio.NativeIO$POSIX$CacheManipulator.posixFadviseIfPossible(NativeIO.java:146)
at org.apache.hadoop.io.ReadaheadPool$ReadaheadRequestImpl.run(ReadaheadPool.java:208)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2019-12-27 06:17:22,278 [pool-9-thread-1] INFO org.apache.hadoop.mapred.LocalJobRunner - 1 / 1 copied.
2019-12-27 06:17:22,280 [pool-9-thread-1] INFO org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl - finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs
2019-12-27 06:17:22,280 [pool-9-thread-1] INFO org.apache.hadoop.mapred.Merger - Merging 1 sorted segments
2019-12-27 06:17:22,280 [pool-9-thread-1] INFO org.apache.hadoop.mapred.Merger - Down to the last merge-pass, with 1 segments left of total size: 7 bytes
2019-12-27 06:17:22,281 [pool-9-thread-1] INFO org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl - Merged 1 segments, 14 bytes to disk to satisfy reduce memory limit
2019-12-27 06:17:22,281 [pool-9-thread-1] INFO org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl - Merging 1 files, 18 bytes from disk
2019-12-27 06:17:22,281 [pool-9-thread-1] INFO org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl - Merging 0 segments, 0 bytes from memory into reduce
2019-12-27 06:17:22,281 [pool-9-thread-1] INFO org.apache.hadoop.mapred.Merger - Merging 1 sorted segments
2019-12-27 06:17:22,281 [pool-9-thread-1] INFO org.apache.hadoop.mapred.Merger - Down to the last merge-pass, with 1 segments left of total size: 7 bytes
2019-12-27 06:17:22,282 [pool-9-thread-1] INFO org.apache.hadoop.mapred.LocalJobRunner - 1 / 1 copied.
2019-12-27 06:17:22,283 [pool-9-thread-1] INFO org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter - File Output Committer Algorithm version is 1
2019-12-27 06:17:22,283 [pool-9-thread-1] INFO org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter - FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
2019-12-27 06:17:22,284 [pool-9-thread-1] INFO org.apache.pig.impl.util.SpillableMemoryManager - Selected heap (PS Old Gen) of size 699400192 to monitor. collectionUsageThreshold = 489580128, usageThreshold = 489580128
2019-12-27 06:17:22,285 [pool-9-thread-1] WARN org.apache.pig.data.SchemaTupleBackend - SchemaTupleBackend has already been initialized
2019-12-27 06:17:22,286 [pool-9-thread-1] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Reduce - Aliases being processed per job phase (AliasName[line,offset]): M: C[7,4],B[6,4] C: C[7,4],B[6,4] R: C[7,4]
2019-12-27 06:17:22,287 [pool-9-thread-1] INFO org.apache.hadoop.mapred.Task - Task:attempt_local1424814286_0002_r_000000_0 is done. And is in the process of committing
2019-12-27 06:17:22,289 [pool-9-thread-1] INFO org.apache.hadoop.mapred.LocalJobRunner - 1 / 1 copied.
2019-12-27 06:17:22,289 [pool-9-thread-1] INFO org.apache.hadoop.mapred.Task - Task attempt_local1424814286_0002_r_000000_0 is allowed to commit now
2019-12-27 06:17:22,292 [pool-9-thread-1] INFO org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter - Saved output of task 'attempt_local1424814286_0002_r_000000_0' to file:/tmp/temp2029752934/tmp726323435/_temporary/0/task_local1424814286_0002_r_000000
2019-12-27 06:17:22,292 [pool-9-thread-1] INFO org.apache.hadoop.mapred.LocalJobRunner - reduce > reduce
2019-12-27 06:17:22,292 [pool-9-thread-1] INFO org.apache.hadoop.mapred.Task - Task 'attempt_local1424814286_0002_r_000000_0' done.
2019-12-27 06:17:22,292 [pool-9-thread-1] INFO org.apache.hadoop.mapred.LocalJobRunner - Finishing task: attempt_local1424814286_0002_r_000000_0
2019-12-27 06:17:22,292 [Thread-18] INFO org.apache.hadoop.mapred.LocalJobRunner - reduce task executor complete.
2019-12-27 06:17:22,460 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - HadoopJobId: job_local1424814286_0002
2019-12-27 06:17:22,460 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Processing aliases B,C
2019-12-27 06:17:22,460 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - detailed locations: M: C[7,4],B[6,4] C: C[7,4],B[6,4] R: C[7,4]
2019-12-27 06:17:22,463 [main] INFO org.apache.hadoop.metrics.jvm.JvmMetrics - Cannot initialize JVM Metricswith processName=JobTracker, sessionId= - already initialized
2019-12-27 06:17:22,464 [main] INFO org.apache.hadoop.metrics.jvm.JvmMetrics - Cannot initialize JVM Metricswith processName=JobTracker, sessionId= - already initialized
2019-12-27 06:17:22,465 [main] INFO org.apache.hadoop.metrics.jvm.JvmMetrics - Cannot initialize JVM Metricswith processName=JobTracker, sessionId= - already initialized
2019-12-27 06:17:22,471 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 100% complete
2019-12-27 06:17:22,474 [main] INFO org.apache.pig.tools.pigstats.mapreduce.SimplePigStats - Script Statistics:
HadoopVersion PigVersion UserId StartedAt FinishedAt Features
2.9.2 0.16.0 root 2019-12-27 06:17:20 2019-12-27 06:17:22 HASH_JOIN,GROUP_BY
Success!
Job Stats (time in seconds):
JobId Maps Reduces MaxMapTime MinMapTime AvgMapTime MedianMapTime MaxReduceTime MinReduceTime AvgReduceTime MedianReducetime Alias Feature Outputs
job_local1289071959_0001 2 1 n/a n/a n/a n/a n/a n/a n/a n/a trans,users,users_trans HASH_JOIN
job_local1424814286_0002 1 1 n/a n/a n/a n/a n/a n/a n/a n/a B,C GROUP_BY,COMBINER file:/tmp/temp2029752934/tmp726323435,
Input(s):
Successfully read 5 records from: "/tmp/users.txt"
Successfully read 10 records from: "/tmp/transaction.txt"
Output(s):
Successfully stored 1 records in: "file:/tmp/temp2029752934/tmp726323435"
Counters:
Total records written : 1
Total bytes written : 0
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0
Job DAG:
job_local1289071959_0001 -> job_local1424814286_0002,
job_local1424814286_0002
2019-12-27 06:17:22,475 [main] INFO org.apache.hadoop.metrics.jvm.JvmMetrics - Cannot initialize JVM Metricswith processName=JobTracker, sessionId= - already initialized
2019-12-27 06:17:22,476 [main] INFO org.apache.hadoop.metrics.jvm.JvmMetrics - Cannot initialize JVM Metricswith processName=JobTracker, sessionId= - already initialized
2019-12-27 06:17:22,477 [main] INFO org.apache.hadoop.metrics.jvm.JvmMetrics - Cannot initialize JVM Metricswith processName=JobTracker, sessionId= - already initialized
2019-12-27 06:17:22,485 [main] INFO org.apache.hadoop.metrics.jvm.JvmMetrics - Cannot initialize JVM Metricswith processName=JobTracker, sessionId= - already initialized
2019-12-27 06:17:22,486 [main] INFO org.apache.hadoop.metrics.jvm.JvmMetrics - Cannot initialize JVM Metricswith processName=JobTracker, sessionId= - already initialized
2019-12-27 06:17:22,487 [main] INFO org.apache.hadoop.metrics.jvm.JvmMetrics - Cannot initialize JVM Metricswith processName=JobTracker, sessionId= - already initialized
2019-12-27 06:17:22,492 [main] WARN org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Encountered Warning FIELD_DISCARDED_TYPE_CONVERSION_FAILED 15 time(s).
2019-12-27 06:17:22,493 [main] WARN org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Encountered Warning ACCESSING_NON_EXISTENT_FIELD 55 time(s).
2019-12-27 06:17:22,493 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
2019-12-27 06:17:22,496 [main] INFO org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS
2019-12-27 06:17:22,496 [main] WARN org.apache.pig.data.SchemaTupleBackend - SchemaTupleBackend has already been initialized
2019-12-27 06:17:22,503 [main] INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input files to process : 1
2019-12-27 06:17:22,503 [main] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1
2019-12-27 06:17:22,541 [main] INFO org.apache.pig.Main - Pig script completed in 2 seconds and 965 milliseconds (2965 ms)
2条答案
按热度按时间abithluo1#
考虑到您拥有的数据,第一个问题是您没有逗号,因此必须将行作为一个整体加载,然后再拆分它们。我在事务文件中使用了两个或多个空格,因为您的最后一列似乎是一个包含空格的字符串。为了准确起见,我建议使用比空格/制表符更好的分隔符。
然后,group by需要引用数据来自的关系。
我想其他的都很好,虽然我不确定
COUNT(X)
```A = LOAD '/tmp/users.txt' USING PigStorage() as (line:chararray);
USERS = FOREACH A GENERATE FLATTEN(STRSPLIT(line, '\s+')) AS (userid:int,email:chararray,language:chararray,location:chararray);
B = LOAD '/tmp/transactions.txt' USING PigStorage() as (line:chararray);
TRANS = FOREACH B GENERATE FLATTEN(STRSPLIT(line, '\s\s+')) AS (id:int,product:int,userid:int,purchase:double,desc:chararray);
X = JOIN USERS BY userid RIGHT, TRANS BY userid;
X_grouped = GROUP X BY (TRANS::desc, USERS::location);
RES = FOREACH X_grouped GENERATE group as comb, COUNT(X) AS Total;
\d RES;
((a jeans,HN),1)
((a jumper,FR),1)
((a jumper,GB),1)
((a jumper,IS),1)
((a jumper,US),1)
((a lotion,US),1)
((a soapbox,HN),1)
((a sweater,HN),1)
((a adhesive,FR),1)
((a rubber chicken,FR),1)
cvxl0en22#
建议
首先:看来你是从Pig开始的。知道cloudera最近决定贬低pig可能很有价值。当然,它不会停止存在,但是如果您计划学习新技能或实现新用例,请三思而后行。我建议把Hive/星火/ Impala 作为更经得起未来考验的替代品。
回答
你的工作成功了,但可能没有你想要的产出。有几个提示可能是错误的(数据类型/字段名),但是这并不指向代码中的特定问题。
我的建议是找出问题到底发生在哪里。只需切掉代码的结尾并打印一个中间结果,看看您是否仍在正确的轨道上。
在(可能的)情况下,您的load语句已经有问题了,值得注意的是,您仍然可以进一步缩小它的范围。首先加载,然后应用模式。