这是我的设想。我有一个工作,处理了大量的csv数据,并写出来使用avro文件除以日期。我得到了一个小文件,我想用它来更新其中的一些文件,并添加一个额外的条目,以便在需要时运行第二个作业,而不是再次重新处理整个数据集。
这是一种想法:
job1:处理大量csv数据,将其写入压缩的avro文件中,按输入日期拆分成文件。源数据不除以日期,因此此作业将执行此操作。
job2(在job1运行之间根据需要运行):处理小的更新文件,并使用它将条目添加到相应的avro文件中。如果不存在,请创建一个新文件。
job3(总是运行):从job1(也可能是job2)的输出生成一些用于报告的度量。
所以,我必须这样写一个java作业。我的第一份工作似乎很好。3也是。我不知道如何接近工作2。
我是这么想的:
使用分布式缓存传递更新文件。分析此文件以生成job类中的日期列表,并使用此列表筛选job1中的文件,job1将作为此作业的输入。
在mapper中,访问分布式更新文件,并将它们添加到我读入的avro对象集合中。如果文件还不存在呢?这有用吗?
使用reducer编写新的对象集合
你会这样做吗?如果没有更好的方法是什么?合路器在这里有意义吗?我觉得答案是否定的。
提前谢谢。
3条答案
按热度按时间idfiyjo81#
您可以遵循以下方法:
1) 在所有csv文件上运行job1
2) 在小文件上运行job2并创建新输出
3) 对于更新,您需要再运行一个作业,在此作业中,在setup()方法中加载job2的输出,并将job1的输出作为map()输入。然后编写更新逻辑并生成最终输出。
4) 然后运行job3进行处理。
根据我的说法,这是可行的。
5anewei62#
只有一个疯狂的想法:为什么你真的需要更新
job1
输出?job1为date生成一个文件。为什么不添加像随机uuid这样的唯一后缀呢?
job2处理“更新”信息。也许几次吧。输出文件命名的逻辑是相同的:基于日期的名称和唯一的后缀。
job3收集job1和job2输出,按日期前缀将它们分组,并使用所有后缀作为输入。
如果目标是基于日期的分组,那么对我来说,这里有很多优势,显而易见:
你不在乎“你是否有这个日期的job1输出”。
您甚至不关心是否需要用多个job2结果更新一个job1输出。
您不会打破hdfs方法的“无文件更新”限制,它具有“一次写入”直接处理的全部功能。
你只需要一些具体的
InputFormat
为了你的工作。看起来没那么复杂。如果需要合并来自不同来源的数据,没问题。
job3本身可以忽略从多个源接收数据的事实。
InputFormat
你应该小心点。几个job1输出可以以相同的方式组合。
限制:
这可能会产生比您负担得起的大数据集和多个过程更多的小文件。
你需要定制
InputFormat
.对我来说,如果我正确理解你的情况,你可以/需要按日期分组文件作为job3的输入,这是一个很好的选择。
希望这对你有帮助。
thtygnil3#
对于job2,可以读取更新文件以过滤驱动程序代码中的输入数据分区,并在输入路径中设置它。可以按照当前方法将更新文件作为分发缓存文件读取。如果无法读取更新文件而希望作业失败,请在安装方法本身中引发异常。
如果更新逻辑不需要在reduce端进行聚合,请将job2设置为map only job。您可能需要构建逻辑来标识job3中更新的输入分区,因为它将接收job1输出和job2输出。