我在同一时间运行了大量的jobcontrols,它们都具有相同的controlledjobs集。每个jobcontrol都处理一组不同的输入/输出文件(按日期范围),但它们都是同一类型的。我观察到的问题是,reduce步骤接收的数据被设计成由处理不同日期范围的reducer处理。日期范围由作业设置,用于确定输入和输出,并从reducer中的上下文读取。
如果我按顺序提交jobcontrols,这会停止,但这不好。这是我应该用自定义分区器解决的问题吗?如果我不知道哪个减速机处理的是我当前的日期范围,我该如何确定一个键的正确减速机?为什么示例化的reducer没有被锁定到它们的jobcontrol?
我已经编写了所有的jobcontrols、jobs、map和reduce,它们都是用java实现的。
我用的是2.0.3-alpha和Yarn。那有什么关系吗?
我在分享代码时必须小心一点,但这里有一个经过净化的Map器:
protected void map(LongWritable key, ProtobufWritable<Model> value, Context context)
throws IOException, InterruptedException {
context.write(new Text(value.get().getSessionId()),
new ProtobufModelWritable(value.get()));
}
和减速器:
protected void reduce(Text sessionId, Iterable<ProtobufModelWritable> models, Context context)
throws IOException, InterruptedException {
Interval interval = getIntervalFromConfig(context);
Model2 model2 = collapseModels(Iterables.transform(models, TO_MODEL));
Preconditions.checkArgument(interval.contains(model2.getTimeStamp()),
"model2: " + model2 + " does not belong in " + interval);
}
private Interval getIntervalFromConfig(Context context) {
String i = context.getConfiguration().get(INTERVAL_KEY);
return Utils.interval(i);
}
1条答案
按热度按时间bis0qfac1#
作为参考,我用两件事修正了这个问题。最重要的问题是,当我为每个间隔创建单独的作业时,我给每个作业取了相同的名称。通过将序列化间隔附加到作业名称,hadoop知道要将map结果发送到哪个reducer。
此外,我开始为每个作业创建单独的配置对象,而不是复制初始配置。这可能是不必要的,但至少我知道我不能犯错误,开始共享相同的配置对象。