输出数据量较大时,可以使用Hadoop提供的压缩机制对数据进行压缩,减少网络传输带宽和存储的消耗。
目的:实现WordCount,词频统计结果,以压缩格式输出到HDFS。
#!/usr/bin/python
import os
import sys
import gzip
def get_file_handler(f):
file_in = open(f, 'r')
return file_in
def get_cachefile_handlers(f):
f_handlers_list = []
if os.path.isdir(f):
for fd in os.listdir(f):
f_handlers_list.append(get_file_handler(f + '/' + fd))
return f_handlers_list
def read_local_file_func(f):
word_set = set()
for cachefile in get_cachefile_handlers(f):
for line in cachefile:
word = line.strip()
word_set.add(word)
return word_set
def mapper_func(white_list_fd):
word_set = read_local_file_func(white_list_fd)
for line in sys.stdin:
ss = line.strip().split(' ')
for s in ss:
word = s.strip()
#if word != "" and (word in word_set):
if word !="":
print "%s\t%s" % (s, 1)
if __name__ == "__main__":
module = sys.modules[__name__]
func = getattr(module, sys.argv[1])
args = None
if len(sys.argv) > 1:
args = sys.argv[2:]
func(*args)
#!/usr/bin/python
import sys
def reduer_func():
current_word = None
count_pool = []
sum = 0
for line in sys.stdin:
word, val = line.strip().split('\t')
if current_word == None:
current_word = word
if current_word != word:
for count in count_pool:
sum += count
print "%s\t%s" % (current_word, sum)
current_word = word
count_pool = []
sum = 0
count_pool.append(int(val))
for count in count_pool:
sum += count
print "%s\t%s" % (current_word, str(sum))
if __name__ == "__main__":
module = sys.modules[__name__]
func = getattr(module, sys.argv[1])
args = None
if len(sys.argv) > 1:
args = sys.argv[2:]
func(*args)
用脚本一键启动map.py 与 red.py。
怎么写脚本?
HADOOP_CMD="/usr/local/src/hadoop-2.6.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar"
"/test/The_Man_of_Property.txt"
"/output_cachearchive_broadcast"
HADOOP_CMD="/usr/local/src/hadoop-2.6.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar"
INPUT_FILE_PATH_1="/test/The_Man_of_Property.txt"
OUTPUT_PATH="/output_cachearchive_broadcast"
# 输出文件已存在就删掉,避免因为文件存在导致的运行报错
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH
# Step 1.
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH_1 \
-output $OUTPUT_PATH \
-mapper "python map.py mapper_func WH.gz" \
-reducer "python red.py reduer_func" \
-jobconf "mapred.reduce.tasks=5" \
-jobconf "mapred.job.name=cachefile_demo" \
-jobconf "mapred.compress.map.output=true" \
-jobconf "mapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" \
-jobconf "mapred.output.compress=true" \
-jobconf "mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" \
-cacheArchive "hdfs://master:9000/test/white.tar.gz#WH.gz" \
-file "./map.py" \
-file "./red.py"
-mapper “python map.py mapper_func WH.gz”
-mapper意思是map函数;
python map.py mapper_func 意思是调用python的map.py程序里的mapper_func函数方法;
后面的WH.gz是一个参数,参数对应的是:-cacheArchive的#WH.gz;
-cacheArchive “hdfs://master:9000/test/white.tar.gz#WH.gz”
WH.gz 就是代表 white.tar.gz
-jobconf:提交作业的一些配置属性
常见配置:
(1)mapred.map.tasks:map task数目
(2)mapred.reduce.tasks:reduce task数目
mapred.job.name | 作业名 |
mapred.job.priority | 作业优先级 |
mapred.compress.map.output | map的输出是否压缩 |
mapred.map.output.compression.codec | map的输出压缩方式 |
mapred.output.compress | reduce的输出是否压缩 |
mapred.output.compression.codec | reduce的输出压缩方式 |
mapred.job.map.capacity | 最多同时运行map任务数 |
mapred.job.reduce.capacity | 最多同时运行reduce任务数 |
mapred.task.timeout | 任务没有响应(输入输出)的最大时间 |
run.sh 脚本执行完毕,查看结果:hadoop fs -ls /output_cachearchive_broadcast
可以看到输出结果是五个压缩包。看看里面的内容:
1. 用cat方法:hadoop fs -cat /output_cachearchive_broadcast/part-00000.gz | head
,会乱码,查看不了,得换一种。
2. 用text方法:hadoop fs -text /output_cachearchive_broadcast/part-00000.gz | head
可以查看压缩文件内容!!
完成:实现WordCount,词频统计结果,以压缩格式输出到HDFS。
如何对这些压缩文件解压呢?
用脚本一键解压。
HADOOP_CMD="/usr/local/src/hadoop-2.6.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar"
"/output_cachearchive_broadcast"
"/output_cat"
"cat"
HADOOP_CMD="/usr/local/src/hadoop-2.6.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar"
INPUT_PATH="/output_cachearchive_broadcast"
OUTPUT_PATH="/output_cat"
#$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH
# Step 2.
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_PATH\
-output $OUTPUT_PATH\
-mapper "cat" \
-jobconf "mapred.reduce.tasks=0"
运行脚本 run.sh
hadoop fs -ls /output_cat
1. 用cat方法:hadoop fs -cat /output_cat/part-00000 | head
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/weixin_44775255/article/details/125601898
内容来源于网络,如有侵权,请联系作者删除!