public class FilteringMapper extends Mapper<LongWritable, Text, LongWritable, Text>{
private static final Logger _logger = Logger.getLogger(FilteringMapper.class);
protected void map(LongWritable key, Text value, Context context) {
if(recordIsBad(value))
_logger.info(<log record data you care about>);
else
context.write(key, value);
}
private boolean recordIsBad(Text record){
//return true if record is bad by your standards
}
}
3条答案
按热度按时间uxhixvfz1#
因为您是基于字段的丢失来过滤记录,所以这是适合Map器实现的逻辑。java apiMap器可以如下所示:
此Map程序将仅根据您的标准进行筛选。如果您需要在Map器中对数据进行进一步的转换,那么很容易添加。
enyaitl32#
通过设置属性,可以跳过mapreduce作业中的记录
mapreduce.map.skip.maxrecords
和/或mapreduce.reduce.skip.maxgroups
值>0(默认值为0)。下面是一个简单的命令,您可以使用hadoop流在shell上运行,以启动一个mapreduce作业,该作业将忽略1000个mapper失败:
(其中
map.sh
以及reduce.sh
是可执行的bash脚本)。根据文件:
hadoop提供了一个选项,在处理map输入时可以跳过一组特定的错误输入记录。应用程序可以通过skipbadrecords类控制此功能。
当Map任务在特定输入上确定崩溃时,可以使用此功能。这通常是由于map函数中的错误造成的。通常,用户必须修复这些bug。然而,这有时是不可能的。例如,bug可能存在于第三方库中,而第三方库的源代码不可用。在这种情况下,即使多次尝试,任务也不会成功完成,作业也会失败。使用此功能,只有围绕坏记录的一小部分数据丢失,这对于某些应用程序(例如,对非常大的数据执行统计分析的应用程序)可能是可以接受的。
您可以在中看到与跳过记录相关的所有属性
mapred-default.xml
.另一个相关属性是
mapreduce.task.skip.start.attempts
:mr任务开始跳过记录之前失败的尝试次数(默认值为2)。8iwquhpp3#
处理损坏记录的最佳方法是在Map器或reducer代码中。您可以检测坏记录并忽略它,也可以通过抛出异常中止作业。您还可以使用计数器计算作业中错误记录的总数,以查看问题的严重程度。不过,在极少数情况下,您无法处理该问题,因为第三方库中存在一个无法在mapper或reducer中解决的bug。在这些情况下,可以使用hadoop的可选跳过模式来自动跳过坏记录。启用跳过模式时,任务会将正在处理的记录报告回tasktracker。当任务失败时,tasktracker将重试该任务,跳过导致失败的记录。由于需要额外的网络通信量和簿记来维护失败的记录范围,因此只有在任务失败两次之后,才会启用跳过模式。
因此,对于在错误记录上持续失败的任务,tasktracker将运行以下具有以下结果的任务尝试:
任务失败。
任务失败。
已启用跳过模式。任务失败,但失败记录由tasktracker存储。
跳过模式仍处于启用状态。通过跳过上一次尝试中失败的错误记录,任务成功。
跳过模式默认为关闭;您可以使用
SkipBadRecords
班级。值得注意的是,跳过模式在每个任务尝试中只能检测到一条坏记录,因此这种机制只适用于检测偶尔出现的坏记录(例如,每个任务检测几个坏记录)。您可能需要增加任务尝试的最大次数(通过mapred.map.max.attempts
以及mapred.reduce.max.attempts
)为跳过模式提供足够的尝试来检测和跳过输入拆分中的所有错误记录。hadoop检测到的坏记录将作为序列文件保存在_logs/skip
子目录。可在作业完成后(使用hadoop fs -text
,例如)。来自汤姆·怀特的“hadoop:权威指南”的文本