我在python中的一个hadoop伪分布式节点上运行一个流式hadoop作业,还使用hadoop lzo在一个.lzo压缩的输入文件上生成拆分。
当使用小型压缩或未压缩的测试数据集时,一切都按预期工作;mapreduce输出与unix中简单的“cat | map | sort | reduce”管道中的输出匹配。-输入是否压缩。
但是,一旦我开始处理单个大的.lzo(预索引)数据集(~40gb压缩),并且作业被拆分为多个Map器,输出看起来就会被截断—只有前几个键值存在。
接下来是代码+输出-如您所见,这是测试整个过程的一个非常简单的计数。
直接unix管道对测试数据的输出(大数据集的子集);
lzop -cd objectdata_input.lzo | ./objectdata_map.py | sort | ./objectdata_red.py
3656 3
3671 3
51 6
hadoop作业对测试数据的输出(与上面的测试数据相同)
hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-streaming-*.jar -input objectdata_input.lzo -inputformat com.hadoop.mapred.DeprecatedLzoTextInputFormat -output retention_counts -mapper objectdata_map.py -reducer objectdata_red.py -file /home/bob/python-dev/objectdata_map.py -file /home/bob/python-dev/objectdata_red.py
3656 3
3671 3
51 6
现在,测试数据是来自真实数据集的一小部分行,所以我至少希望在针对完整数据集运行作业时,在结果输出中看到上面的键。然而,我得到的是;
hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-streaming-*.jar -input objectdata_input_full.lzo -inputformat com.hadoop.mapred.DeprecatedLzoTextInputFormat -output retention_counts -mapper objectdata_map.py -reducer objectdata_red.py -file /home/bob/python-dev/objectdata_map.py -file /home/bob/python-dev/objectdata_red.py
1 40475582
12 48874
14 8929777
15 219984
16 161340
17 793211
18 78862
19 47561
2 14279960
20 56399
21 3360
22 944639
23 1384073
24 956886
25 9667
26 51542
27 2796
28 336767
29 840
3 3874316
30 1776
33 1448
34 12144
35 1872
36 1919
37 2035
38 291
39 422
4 539750
40 1820
41 1627
42 97678
43 67581
44 11009
45 938
46 849
47 375
48 876
49 671
5 262848
50 5674
51 90
6 6459687
7 4711612
8 20505097
9 135592
…根据数据集,密钥比我预期的要少很多。
我不太在意键本身-这一组可以预期给定的输入数据集,我更关心的是应该有更多的键,数以千计。当我在unix管道中针对数据集中的前2500万条记录运行代码时,我得到的密钥范围大约在1-7000之间。
所以,这个输出似乎只是我实际期望的前几行,我不知道为什么。我是否错过了整理许多第0000部分的文件?或者类似的?这只是我在家测试的单节点伪分布式hadoop,所以如果有更多的部分文件要收集,我不知道它们可能在哪里;它们不会出现在hdfs的retention\u counts目录中。
mapper和reducer代码如下所示-与许多单词计数示例相同;
objectdata\Map.py
# !/usr/bin/env python
import sys
RETENTION_DAYS=(8321, 8335)
for line in sys.stdin:
line=line.strip()
try:
retention_days=int(line[RETENTION_DAYS[0]:RETENTION_DAYS[1]])
print "%s\t%s" % (retention_days,1)
except:
continue
objectdata\红色.py
# !/usr/bin/env python
import sys
last_key=None
key_count=0
for line in sys.stdin:
key=line.split('\t')[0]
if last_key and last_key!=key:
print "%s\t%s" % (last_key,key_count)
key_count=1
else:
key_count+=1
last_key=key
print "%s\t%s" % (last_key,key_count)
所有这些都是在手动安装的hadoop1.1.2伪分布式模式下进行的,hadooplzo是从
https://github.com/kevinweil/hadoop-lzo
暂无答案!
目前还没有任何答案,快来回答吧!