使用hadoopmapreduce实现排序算法?
我有一个问题,当我尝试在hadoop排序,我无法得到排序输出?map&reduce 100%工作,但作业失败错误。。。
下面是我的代码示例mapper.py和reducer.py
为了更好的理解,我将包含整个代码。我希望这有助于理解这个问题。如果少了什么,请告诉我。
**mapper.py**
!/usr/bin/env python
coding: utf-8
import sys
import re
words=[]
doc_id=0;
line_number=1
wordcount = 1
wordcount_per_doc = 0
df_t=1
for line in sys.stdin:
'''
0.INPUT DATA| récupère le contenu de chaque fichier texte sur la sortie stdin
1. Associe un ID au document lu
2. Tokénise en mots
3. Formatte chaque mot extrait
4. Ajoute les mots de la ligne à la liste complète de mots
5. Calcule le nombre de mot dans le document analysé
6.OUTPUT DATA| Chaque mot est retourné sur stdout sur la forme suivante :
- \t
- : 'word',docid
- : wordcount,wordcount_per_doc, df_t
'''
# Supprimer les espaces
line = line.strip()
# 1. Associe un ID au document lu
if line_number==1:
if line=='1903':
doc_id=2
line_number=0
else:
doc_id=1
line_number=0
# 2. Tokénise en mots
words_in_line = line.split()
# 3. Formatte chaque mot extrait
# lower case
words_in_line = [word.lower() for word in words_in_line]
# Suppression des ponctuations et des caractères numériques (with regex)
words_in_line = [re.sub(r'[^\w]', '', word) for word in words_in_line]
# filtering stop words
stopwords=[]
for line in open('stopwords_en.txt'):
stopwords.append(line.strip())
words_in_line = [word for word in words_in_line if word not in stopwords]
# Suppression des mots de moins de 3 caractères
words_in_line = [word for word in words_in_line if len(word)>2]
# 4. Ajoute les mots de la ligne à la liste complète de mots
words += words_in_line
#4. Calcule le nombre de mot par ligne et ajoute "wordcount_per_doc" à la liste de mots
wordcount_per_doc += len(words_in_line)
5.OUTPUT DATA| Chaque mot est retourné sur stdout
for word in words:
print("%s,%i\t%i\t%i\t%i" % (word,doc_id,wordcount,wordcount_per_doc, df_t))
**reducer.py**
!/usr/bin/env python
coding: utf-8
import sys
import os
import csv
from math import log10
from collections import defaultdict
words = [] # liste de mots
last_word_docid_pair = None # pour le calcul du wordcount
df_t_dict = defaultdict(lambda: set()) # pour le calcul du df_t
docid_list = set() # nombre de doc dans la collection
'''
INPUT DATA| sortie de la fonction MAP triée sur stdin : word,docid\twordcount\tword_per_doc\tdf_t
KEY: paire (word,docid)
VALUE: wordcount\tword_per_doc\tdf_t
FOR #1 - Traitement de chaque ligne
# 1. Calcule le "WordCount" en additionnant le nombre d'occurrences de chaque clés (mot,docid) et l'ajoute à la liste "words"
# 2. Calcule df_t en construisant un dictionaire df_t_dict (key=word:value=set(docid))
# 3. Calcule le nombre de document N dans la collection en construisant un set de chaque docid
FOR #2 - Traitement de la liste finale de mot "words"
# 5. Donne les valeurs finale de df_t pour chaque mot
# 6. Calcule TF-IDF
# 7. Affichage sur chaque ligne de stdout : <word,docid______TFIDF>
OUTPUT DATA| sur chaque ligne de stdout : <word,docid______TFIDF>
KEY: paire (word,docid)
VALUE: TFIDF
OUTPUT DATA| fichier words_top20_tfidf_docid.csv
top 20 des mots ayant la plus forte pondération pour chaque document
'''
for line in sys.stdin:
# get key/values
line = line.strip()
key,wordcount,wordcount_per_doc,df_t = line.split("\t")
wordcount_per_doc=int(wordcount_per_doc)
wordcount = int(wordcount)
df_t = int(df_t)
word,docid = key.split(",")
docid = int(docid)
word_docid_pair = (word,docid)
# 1. Calcule le "WordCount"
if last_word_docid_pair is None: # Traitement du 1er mot
last_word_docid_pair = word_docid_pair
last_wordcount = 0
last_wordcount_per_doc = wordcount_per_doc
last_df_t = df_t
if word_docid_pair == last_word_docid_pair:
last_wordcount += wordcount
else:
words.append([last_word_docid_pair,last_wordcount,last_wordcount_per_doc,last_df_t])
# set new values
last_word_docid_pair = word_docid_pair
last_wordcount = wordcount
last_wordcount_per_doc = wordcount_per_doc
last_df_t = df_t
# 2. Calcule df_t
dic_value = df_t_dict[word]
dic_value.add(docid)
df_t_dict[word] = dic_value
# 3. Calcule le nombre de document N dans la collection
docid_list.add(docid)
ajout du dernier mot non traité par l'étape 1
words.append([last_word_docid_pair,last_wordcount,last_wordcount_per_doc,last_df_t])
3. Calcule le nombre de document N dans la collection
N = len(docid_list)
for word_block in words:
word,docid,wordcount,wordcount_per_doc,df_t = word_block[0][0],int(word_block[0][1]),int(word_block[1]),int(word_block[2]),int(word_block[3])
# 5. Donne les valeurs finale de df_t & wordcount_per_doc à chaque mot
df_t = len(df_t_dict[word])
# 6. Calcule TF-IDF = wordcount x wordcount_per_doc x log10(N/df_t)
word_block.append(wordcount * wordcount_per_doc * log10(N/df_t))
TFIDF = word_block[4]
# 7.OUTPUT DATA| ensemble de paires ((mot, doc_ID), TF-IDF) sur chaque ligne de stdout
key_formated = '{:_<30}'.format("%s,%i" % (word,docid))
print("%s\t%i\t%i\t%i\t%.*f" % (key_formated,wordcount,wordcount_per_doc,df_t,5,TFIDF))
RESULTATS DE TEST - TOP 20 tf-idf for each document sent to words_top20_tfidf_.csv
for docid in docid_list:
words_top20_tfidf = sorted([word_block for word_block in words if word_block[0][1] == docid], key=lambda x: x[4], reverse=True)[:20]
document_name = 'words_top20_tfidf_docid'
document_name +="%s" %(docid)
with open('%s.csv' % document_name, 'w') as f:
csv.writer(f).writerow(words_top20_tfidf)
COMMAND & OUTPUT:
hadoop jar /home/hduser/Desktop/sample/hadoop-streaming-3.1.2.jar -D mapred.reduce.task=2 -input /user/defoe-robinson-103.txt -output /home/hduser/data -mapper "python3 mapper.py" -reducer "python3 reducer.py" -file /home/hduser/Desktop/mr_tfidf/tf-idf_mapper.py -file /home/hduser/Desktop/mr_tfidf/tf-idf_reducer.py
2019-10-08 10:38:25,816 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
2019-10-08 10:38:26,387 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [/home/hduser/Desktop/mr_tfidf/tf-idf_mapper.py, /home/hduser/Desktop/mr_tfidf/tf-idf_reducer.py, /tmp/hadoop-unjar2406562609442989570/] [] /tmp/streamjob5471183357876092640.jar tmpDir=null
2019-10-08 10:38:29,760 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
2019-10-08 10:38:30,814 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
2019-10-08 10:38:31,741 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/hduser/.staging/job_1570272874011_0010
2019-10-08 10:38:32,382 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2019-10-08 10:38:32,958 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2019-10-08 10:38:32,993 WARN hdfs.DataStreamer: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1252)
at java.lang.Thread.join(Thread.java:1326)
at org.apache.hadoop.hdfs.DataStreamer.closeResponder(DataStreamer.java:986)
at org.apache.hadoop.hdfs.DataStreamer.endBlock(DataStreamer.java:640)
at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:810)
2019-10-08 10:38:33,115 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2019-10-08 10:38:33,646 INFO mapred.FileInputFormat: Total input files to process : 1
2019-10-08 10:38:33,917 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2019-10-08 10:38:34,064 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2019-10-08 10:38:34,099 WARN hdfs.DataStreamer: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1252)
at java.lang.Thread.join(Thread.java:1326)
at org.apache.hadoop.hdfs.DataStreamer.closeResponder(DataStreamer.java:986)
at org.apache.hadoop.hdfs.DataStreamer.endBlock(DataStreamer.java:640)
at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:810)
2019-10-08 10:38:34,105 INFO mapreduce.JobSubmitter: number of splits:2
2019-10-08 10:38:34,905 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2019-10-08 10:38:35,016 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1570272874011_0010
2019-10-08 10:38:35,018 INFO mapreduce.JobSubmitter: Executing with tokens: []
2019-10-08 10:38:36,228 INFO conf.Configuration: resource-types.xml not found
2019-10-08 10:38:36,232 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
2019-10-08 10:38:36,585 INFO impl.YarnClientImpl: Submitted application application_1570272874011_0010
2019-10-08 10:38:36,938 INFO mapreduce.Job: The url to track the job: http://hadoop:8088/proxy/application_1570272874011_0010/
2019-10-08 10:38:36,999 INFO mapreduce.Job: Running job: job_1570272874011_0010
2019-10-08 10:39:00,083 INFO mapreduce.Job: Job job_1570272874011_0010 running in uber mode : false
2019-10-08 10:39:00,089 INFO mapreduce.Job: map 0% reduce 0%
2019-10-08 10:39:20,930 INFO mapreduce.Job: Task Id : attempt_1570272874011_0010_m_000000_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 2
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:465)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:349)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:174)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:168)
2019-10-08 10:39:21,021 INFO mapreduce.Job: Task Id : attempt_1570272874011_0010_m_000001_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 2
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:465)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:349)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:174)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:168)
2019-10-08 10:39:43,799 INFO mapreduce.Job: Task Id : attempt_1570272874011_0010_m_000001_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 2
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:465)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:349)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:174)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:168)
2019-10-08 10:39:43,807 INFO mapreduce.Job: Task Id : attempt_1570272874011_0010_m_000000_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 2
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:465)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:349)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:174)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:168)
2019-10-08 10:40:11,764 INFO mapreduce.Job: Task Id : attempt_1570272874011_0010_m_000001_2, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 2
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:465)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:349)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:174)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:168)
2019-10-08 10:40:11,779 INFO mapreduce.Job: Task Id : attempt_1570272874011_0010_m_000000_2, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 2
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:465)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:349)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:174)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:168)
2019-10-08 10:40:32,304 INFO mapreduce.Job: map 50% reduce 100%
2019-10-08 10:40:33,346 INFO mapreduce.Job: map 100% reduce 100%
2019-10-08 10:40:33,408 INFO mapreduce.Job: Job job_1570272874011_0010 failed with state FAILED due to: Task failed task_1570272874011_0010_m_000001
Job failed as tasks failed. failedMaps:1 failedReduces:0 killedMaps:0 killedReduces: 0
2019-10-08 10:40:33,877 INFO mapreduce.Job: Counters: 14
Job Counters
Failed map tasks=7
Killed map tasks=1
Killed reduce tasks=1
Launched map tasks=8
Other local map tasks=6
Data-local map tasks=2
Total time spent by all maps in occupied slots (ms)=164125
Total time spent by all reduces in occupied slots (ms)=0
Total time spent by all map tasks (ms)=164125
Total vcore-milliseconds taken by all map tasks=164125
Total megabyte-milliseconds taken by all map tasks=168064000
Map-Reduce Framework
CPU time spent (ms)=0
Physical memory (bytes) snapshot=0
Virtual memory (bytes) snapshot=0
2019-10-08 10:40:33,880 ERROR streaming.StreamJob: Job not successful!
Streaming Command Failed!
暂无答案!
目前还没有任何答案,快来回答吧!