我遇到一种情况,我需要用以下格式处理一个非常大的文本文件:
ID \t time \t duration \t Description \t status
我想利用mapreduce来帮助我处理这个文件。我知道mapreduce是基于键、值对工作的。mapper将输出key和一些值,mapreduce将确保所有相同的key都在一个reducer中结束。
我想要在一个减速机中结束的是有时间的行之间的间隔在1小时之内。然后在reducer中,我想访问所有其他信息,如id,duration,status来做其他事情。所以我猜输出的值是一个列表还是什么?
我有一些python代码来处理输入数据。Map器.py
# !/usr/bin/env python
import sys
import re
for line in sys.stdin:
line=line.strip()
portions=re.split(r'\t+',line)
time=portions[1]
# output key,value by print to stdout for reducer.py to read in.
请注意,我的数据集中的时间已经是posix时间格式。
如何在mapper中输出键、值对来实现这一点?
我对mapreduce/hadoop还是个新手,非常感谢大家的帮助。提前谢谢!
1条答案
按热度按时间tvz2xvvm1#
这里有一个策略:
从Map器:发出每条记录的三个副本并使用二次排序:
((复合键),值)=
((消息时间-一小时,当前消息的精确时间),消息)
((信息时间,信息准确时间),信息)
((信息小时+一小时,信息准确时间),信息)
现在:您需要标准的二次排序:
将partitioner设置为仅键的前半部分(消息的小时数)
将GroupingComparator设置为仅键的前半部分(消息的小时数)
将排序比较器设置为(消息的小时数,消息的精确时间)
在reducer中:每个reducer组在消息的精确时间+/-60到120分钟内接收所有消息。reducer按排序顺序查看所有“消息的精确时间”。因此,您可以保持一个滑动窗口的所有消息在过去60分钟内查看每个减速机
注:以上假设60分钟消息的数据可以放在单个任务的内存中。否则,作为窗口功能的一部分,您需要将数据写入磁盘。
更新操作要求进一步澄清的窗口,所以我们来了。
从Map器发出的键的Angular 考虑:每个输入记录有三个键。现在在reducer上,这意味着每个输入记录出现在三个不同的组中。原因是我们需要根据每个输入记录同时考虑超前和滞后记录。因此,现在我们让每个组都可以访问所有输入记录,这些记录可能在最早记录的60分钟内,也可能在最新记录的60分钟内。因为这些记录是按每小时最早的一秒分组的:这意味着-60(最小值)到+120(最大值)与属于给定小时组的任何记录相比。