我的群集中有两个文件 File A
以及 File B
使用以下数据-
文件a
# Format:
# Food Item | Is_A_Fruit (BOOL)
Orange | Yes
Pineapple | Yes
Cucumber | No
Carrot | No
Mango | Yes
文件b
# Format:
# Food Item | Vendor Name
Orange | Vendor A
Pineapple | Vendor B
Cucumber | Vendor B
Carrot | Vendor B
Mango | Vendor A
基本上我想知道每个供应商卖多少水果?
预期产量:
Vendor A | 2
Vendor B | 1
我需要使用hadoop streaming python map reduce来实现这一点。
我读过如何做一个基本的字数计算,我从 sys.stdin
发出 k,v
然后对减速机进行减速机减速机。
我该如何处理这个问题?
我主要关心的是如何读取多个文件,然后在hadoop流媒体中比较它们。
我可以在普通的python中实现这一点(即没有mapreduce和hadoop,这很简单),但是对于我所拥有的庞大数据来说,这是不可行的。
4条答案
按热度按时间z2acfund1#
您将只向hadoop提供包含这些文件的目录。hadoop框架将阅读它们。你没有。
hadoop将把您编写的map类应用于所有文件内容。
然后,hadoop将把您编码的reduce类应用于map类的所有输出。
5q4ezhmt2#
请看一看这个例子,因为它与你要找的东西非常直接相关。
shstlldc3#
一种方法是将其作为两项工作来完成。
过滤fileb,以便只保留包含水果的行
map1:“food item”和数据来自哪个文件的复合键。对“food item”进行分区,并对行是否包含“is\u a\u fruit”信息进行二次排序(以确保每个food item的reducer首先读取该信息)。
reduce1:对于二次排序,排序数据中的第一行将指示此食物项是否是水果(在这种情况下,reducer将输出它)或不是水果(在这种情况下,它不会)。
使用供应商作为键来计算每个供应商的水果数量。
这里第一个作业的mapreduce输出现在具有与fileb相同的结构,但是所有行都是结果,因此这更像wordcount,以vendor name作为键,然后计算行数。
如果您想要独特的水果,您可能需要再次使用二次排序,以消除将与每个供应商关联的所有水果加载到内存中的需要。
也就是说:@cabad建议的解决方案最好是文件足够小。
如果不是的话,使用二次排序的方法是最好的。请看一下@simplefish在这里的答案中建议的这个教程,它介绍了如何在分区内进行二次排序(这些关键字将为您指明正确的方向,以便执行您想要的操作:保证与传递给reducer的给定键相关联的数据的顺序)。
最后一点:您的问题不是“如何读取多个文件”,因为您设计的任何解决方案都不能依赖于知道输入来自哪个文件(您需要依赖于数据的结构,尽管在本例中这不是问题)。
gudnpqoy4#
文件真的有那么大吗?我会把它放在分布式缓存中,然后从那里读取。要将其放入分布式缓存,请在hadoop流调用中使用以下选项:
(我想下面的方法也行,但我没有试过:)
请注意
#fileA
是用于使文件可供Map程序使用的名称。然后,在Map器中,您将从
sys.stdin
(asuming您使用-input '/user/foo/FileB'
)要阅读filea,您应该执行以下操作:现在,我想你已经想到了这一点,但对我来说,有这样一个制图器是有意义的:
打开文件A
逐行(在循环中)读取filea并将其加载到Map中,这样您就可以轻松地查找键并找到其值(yes,no)。
让你的主循环读取标准数据。在循环中,对于每一行(在fileb中),检查您的Map(参见步骤2)以确定您是否有水果。。。等。