hdfs文件压缩与连续摄取

o2rvlv0m  于 2021-05-31  发布在  Hadoop
关注(0)|答案(2)|浏览(455)

我们在hdfs中有几个表,它们每天大约获得4万个新文件。我们需要每两周压缩一次这些table,为此我们需要停止摄入。
我们有Spark摄取从Kafka获得数据,并添加到hdfs(Hive外部表)每30分钟。数据一被接收就被查询,我们的sla不到一个小时,所以我们不能增加批处理间隔。
表在两个字段上进行分区,我们不断地得到较旧的数据,因此在每个注入批处理期间,大多数分区都会更新
例如:/user/head/warehouse/main\u table/state=ca/store=macys/part-00000-017258f8-aaa-bbb-ccc-wefdsds.c000.snappy.parquet
我们正在寻找方法来减少文件创建的数量,但即使这样,我们将不得不做压缩每3/4周,如果不是两个。
由于大多数分区都是不断更新的,所以我们需要在开始压缩之前停止注入(大约1天),这会影响到我们的用户。
我正在寻找方法,以压缩自动不停止摄入?

kzipqqlq

kzipqqlq1#

选择的分区方案有点不幸。不过,你还是可以做一些事情。我所依赖的事实是,您可以在hive中原子地更改分区的位置(altertable。。。分区。。。设置位置):
将分区的hdfs目录复制到其他位置
压缩复制的数据
复制自步骤1以来接收的新文件
“更改表格…”。。。分区。。。设置位置“以将配置单元指向新的压缩位置。
开始摄取到这个新的位置(如果这个步骤很棘手的话,你也可以用压缩的版本替换原来分区位置中的“小”文件,然后做“alter table…”。。。分区。。。再次设置location“以将配置单元指向原始分区位置。
您必须使这个进程连续地逐分区迭代运行。

sxissh06

sxissh062#

谢谢你的建议,非常感谢。
我对hdfs的概念非常陌生,所以请不要介意基本问题,
在执行将未压缩文件与压缩文件交换(alter table。。。分区。。。设置位置)。我相信这些查询可能会失败。我们能把影响降到最低吗?
将分区的hdfs目录复制到其他位置
因为我们在一个表中有两个分区,state和store,所以我需要遍历每个子分区吗?
/tablename/state=ca/store=macys/file1.parquet/tablename/state=ca/store=macys/file2.parquet
/tablename/state=ca/store=jcp/file2.parquet/tablename/state=ca/store=jcp/file2.parquet
/tablename/state=ny/store=macys/file1.parquet/tablename/state=ny/store=macys/file2.parquet
/tablename/state=ny/store=jcp/file2.parquet/tablename/state=ny/store=jcp/file2.parquet

For each state
    for each store
        get list of files in this dir to replace later
        compact 
            /tableName/state=$STATE/store=$STORE (SPARK JOb?)
        replace uncompacted files with compacted files
        alter table ... partition ... set location

我更希望您在第5步中的其他建议“也可以用压缩版本替换原始分区位置中的“小”文件”
我将如何继续实现它,最好是使用脚本、scala或其他编程语言。我有基本的脚本知识,有很好的java经验,对scala还不熟悉,但几天内就能学会。
你好,p

相关问题