我有一个文件,它有两列id和timestamp。我正在计算每个值的会话数-由30分钟以上的不活动状态决定。但是,我在使用流命令时遇到了问题。下面是一个例子。
id,time
1,2015-02-05 01:01:01
1,2015-02-05 01:02:01
3,2015-02-05 02:01:01
3,2015-02-05 02:01:02
我知道我的Map器和减速机工作正常b/c我得到正确的结果时,我只使用一个减速机。我的问题是,当我需要使用多个reducer时,我尝试使用partitioner将map输出的第一个值发送给一个reducer,并按map输出中的第二个值对其排序。对如何做到这一点有什么建议吗?
这就是我要尝试的。
hadoop jar /opt/cloudera/parcels/CDH-5.1.2-1.cdh5.1.2.p470.103/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.3.0-mr1-cdh5.1.2.jar \
-Dmapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D stream.map.output.field.separator=, \
-D stream.num.map.output.key.fields=2 \
-D mapred.text.key.partitioner.options=-k1,1 \
-Dmapred.text.key.comparator.options=-k2,2 \
-input /in/ \
-output /out/ \
-mapper mapper1.py \
-file ${DIR}mapper.py \
-reducer reducerA.py \
-file ${DIR}reducer.py
1条答案
按热度按时间xienkqul1#
将“-dmapred.text.key.comparator.options=-k2,2”更改为“-dmapred.text.key.comparator.options=-k1,2”,这样减速机接收的记录首先按id排序,然后按时间排序。此外,您的reducer还需要比较记录的连续键(id),并且只对id相等的记录进行会话计数。