通过for循环在mapreduce中发出中间键-我误解了mapreduce吗?包括基本示例

6rqinv9w  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(283)

所以我有一个电子邮件的集合,我想用它们来输出唯一的三元组(发送者电子邮件,接收者电子邮件,时间戳),就像这样:

user1@stackoverflow.com    user2@stackoverflow.com    09/12/2009 16:45
user1@stackoverflow.com    user9@stackoverflow.com    09/12/2009 18:45
user3@stackoverflow.com    user4@stackoverflow.com    07/05/2008 12:29

在上面的示例中,用户1向多个收件人(用户2和用户9)发送了一封电子邮件。为了存储收件人,我创建了一个数据结构 EdgeWritable (机具) WritableComparable) 它将保存发件人和收件人的电子邮件地址以及时间戳。
我的Map器如下所示:

private final EdgeWritable edge = new EdgeWritable(); // Data structure for triplets.
private final NullWritable noval = NullWritable.get(); 

...

@Override
public void map(Text key, BytesWritable value, Context context)
        throws IOException, InterruptedException {

    byte[] bytes = value.getBytes();
    Scanner scanner = new Scanner(new ByteArrayInputStream(bytes), "UTF-8");
    String from = null; // Sender's Email address
    ArrayList<String> recipients = new ArrayList<String>(); // List of recipients' Email addresses  
    long millis = -1; // Date

    // Parse information from file
    while(scanner.hasNext()) {
        String line = scanner.nextLine();
        if (line.startsWith("From:")) {
            from = procFrom(stripCommand(line, "From:")); // Get sender e-mail address.
        } else if (line.startsWith("To:")) {
            procRecipients(stripCommand(line, "To:"), recipients); // Populate recipients into a list. 
        } else if (line.startsWith("Date:")) {
            millis = procDate(stripCommand(line, "Date:")); // Get timestamp.

        if (line.equals("")) { // Empty line indicates the end of the header
            break;
        }
    }
    scanner.close();

    // Emit EdgeWritable as intermediate key containing Sender, Recipient and Timestamp. 
    if (from != null && recipients.size() > 0 && millis != -1) { 
        //EdgeWritable has 2 Text values (ew[0] and ew[1]) and a Timestamp. ew[0] is the sender, ew[1] is a recipient.
        edge.set(0, from); // Set ew[0]  
        for(int i = 0; i < recipients.size(); i++) {
            edge.set(1, recipients.get(i)); // Set edge from sender to each recipient i. 
            edge.setTS(millis); // Set date. 
            context.write(edge, noval); // Emit the edge as an intermediate key with a null value. 
        }
    }
}

...

我的减速机只是格式化日期并输出边缘:

public void reduce(EdgeWritable key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
    String date = MailReader.sdf.format(edge.getTS());
    out.set(edge.get(0) + " " + edge.get(1) + " " + date); // same edge from Mapper (an EdgeWritable). 
    context.write(noval, out); // same noval from Mapper (a NullWritable).
}

使用edgewritable作为中间键,使用nullwritable作为值(在mapper中)是一项要求,我不允许使用其他方法。这是我的第一个hadoop/mapreduce程序,我只想知道我的方向是正确的。我在网上看过很多mapreduce示例,从未见过像我这样在for循环中发出键/值对。我觉得我在这里遗漏了一些技巧,但是以这种方式使用for循环是我能想到的唯一方法。
这是“坏”吗?我希望这是明确的,但请让我知道,如果任何进一步的澄清是必要的。

n3h0vuf2

n3h0vuf21#

为每个记录调用map方法,因此数组列表中每个调用只有一个记录。在类级别声明数组列表,以便存储所有记录的值。然后在cleanup方法中,您可以执行在map中编写的emit逻辑。试试这个,如果有用就告诉我。

相关问题