如何在hadoop作业中保持状态?

db2dz4w8  于 2021-06-02  发布在  Hadoop
关注(0)|答案(3)|浏览(347)

我正在开发一个hadoop程序,计划每天运行一次。它需要一堆json文档,每个文档都有一个时间戳,显示添加文档的时间。我的程序应该只处理自上次运行以来添加的文档。所以,我需要保持一个状态,它是一个时间戳,显示hadoop作业最后一次运行的时间。我正在考虑将此状态存储在sql server中,并在我的作业的驱动程序中查询它。这是一个好的解决方案还是一个更好的解决方案?
p、 我的hadoop作业正在hdinsight上运行。已经说过,仍然可以从我的驱动程序查询sql server?

jdgnovmf

jdgnovmf1#

您可以使用日期时间重命名结果文档,然后您的程序可以根据文档的名称处理该文档。

bxjv4tth

bxjv4tth2#

对于在aws(amazonwebservices)中运行的工作流,对于存储在s3中的数据,我们已经解决了这个问题。
我们的设置:
数据存储:aws s3
数据摄取机制:flume
工作流管理:oozie
文件状态存储:mysql
问题:
我们使用flume将数据摄取到amazons3中。所有摄取的数据都在同一个文件夹中(s3是一个键/值存储,没有文件夹的概念)。这里的文件夹意味着,所有的数据都有相同的前缀。例如,/tmp/1.txt,/tmp/2.txt等,这里/tmp/是键前缀)。
我们有一个etl工作流,计划每小时运行一次。但是,由于所有数据都被摄取到同一个文件夹中,因此我们必须区分已处理和未处理的文件。
例如,对于第一个小时,接收的数据是:

/tmp/1.txt
/tmp/2.txt

当工作流第一次启动时,它应该处理来自“1.txt”和“2.txt”的数据,并将它们标记为已处理。
如果在第二个小时内,接收的数据是:

/tmp/3.txt
/tmp/4.txt
/tmp/5.txt

然后,2小时后文件夹中的总数据将为:

/tmp/1.txt
/tmp/2.txt
/tmp/3.txt
/tmp/4.txt
/tmp/5.txt

因为“1.txt”和“2.txt”已经被处理并标记为已处理,所以在第二次运行期间,作业应该只处理“3.txt”、“4.txt”和“5.txt”。
解决方案:
我们开发了一个图书馆(我们称之为 FileManager ),用于管理已处理文件的列表。我们将这个库作为java操作插入到oozie工作流中。这是工作流的第一步。
这个库还负责忽略当前由flume写入的文件。flume将数据写入文件时,这些文件的后缀为“\u current”。因此,这些文件被忽略处理,直到它们被完全写入。
摄取的文件是以时间戳作为后缀生成的。例如“每小时\u feed.1234567”。因此,文件名是按其创建的升序排列的。
为了获得未处理文件的列表,我们使用了s3的使用标记进行查询的功能(例如,如果您在一个文件夹中有10000个文件,如果您将标记指定为第5000个文件的名称,那么s3将返回从5001到10000的文件)。
每个文件有以下3个状态:
成功-成功处理的文件
错误-已拾取以进行处理的文件,但处理这些文件时出错。因此,需要再次提取这些文件进行处理
进行中-已提取以进行处理且作业当前正在处理的文件
对于每个文件,我们在mysql数据库中存储了以下详细信息:
文件名
上次修改时间-我们用它来处理一些角落的情况
文件的状态(进行中、成功、错误)
这个 FileManager 暴露以下接口: GetLatestFiles :返回最新未处理文件的列表 UpdateFileStatus :处理完文件后,更新文件的状态
以下是识别尚未处理的文件的步骤:
查询数据库(mysql),查看最后一个状态为成功的文件(查询: order by created desc ).
如果第一步返回一个文件,那么查询s3,将文件标记设置为最后一个成功处理的文件。这将返回最后一个成功处理的文件之后接收的所有文件。
同时查询数据库以检查是否有任何文件处于错误状态。这些文件需要重新处理,因为以前的工作流没有成功处理它们。
返回从步骤2和3获得的文件列表(在返回之前,将其状态标记为“正在进行”)。
作业成功完成后,将所有已处理文件的状态更新为success。如果处理文件时出错,则将所有文件的状态更新为“错误”(以便下次可以提取这些文件进行处理)
我们使用oozie进行工作流管理。oozie工作流有以下步骤:
步骤1:获取下一组要处理的文件,将它们的每个状态标记为“进行中”,并将它们传递到下一个阶段
第二步:处理文件
步骤3:更新处理的状态(成功或错误)
重复数据消除:当您实现这样一个库时,有可能会重复记录(在某些情况下,同一个文件可能会被提取两次进行处理)。我们已经实现了一个重复数据消除逻辑来删除重复记录。

d8tt03nd

d8tt03nd3#

驱动程序检查最后一个运行时戳是一个很好的方法,但是要存储最后一个运行时戳,可以使用hdfs中的临时文件。

相关问题