可以删除底层的parquet文件而不会对deltalake\u delta\u日志产生负面影响吗

qhhrdooz  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(367)

使用 .vacuum() 在deltalake上,表非常慢(参见emr和s3上的delta-lake(oss)表-没有作业时真空需要很长时间)。
如果我手动删除了底层Parquet文件而没有添加新的 json 日志文件或添加新的 .checkpoint.parquet 归档并更改 _delta_log/_last_checkpoint 指向它的文件;对deltalake表有什么负面影响(如果有的话)?
显然,时间旅行,即加载依赖于我删除的Parquet文件的表的早期版本,将不起作用。我想知道的是,在当前版本的deltalake表中读、写或附加什么问题?
我想在Pypark做什么:


### Assuming a working SparkSession as `spark`

from subprocess import check_output
import json
from pyspark.sql import functions as F

awscmd = "aws s3 cp s3://my_s3_bucket/delta/_delta_log/_last_checkpoint -"
last_checkpoint = str(json.loads(check_output(awscmd, shell=True).decode("utf-8")).get('version')).zfill(20)

s3_bucket_path = "s3a://my_s3_bucket/delta/"

df_chkpt_del = (
    spark.read.format("parquet")
    .load(f"{s3_bucket_path}/_delta_log/{last_checkpoint}.checkpoint.parquet")
    .where(F.col("remove").isNotNull())
    .select("remove.*")
    .withColumn("deletionTimestamp", F.from_unixtime(F.col("deletionTimestamp")/1000))
    .withColumn("delDateDiffDays", F.datediff(F.col("deletionTimestamp"), F.current_timestamp()))
    .where(F.col("delDateDiffDays") < -7 )
)

这里有很多选择。其中之一可能是:

df_chkpt_del.select("path").toPandas().to_csv("files_to_delete.csv", index=False)

我可以阅读的地方 files_to_delete.csv 然后使用一个简单的bash for 循环将每个Parquet文件s3路径传递给 aws s3 rm 命令逐个删除文件。
这可能比 vacuum() ,但至少它在工作时不会消耗集群资源。
如果我这样做,我是否也必须:
写一个新的 _delta_log/000000000000000#####.json 正确记录这些更改的文件?
写一个新的 000000000000000#####.checkpoint.parquet 正确记录这些更改并更改 _delta_log/_last_checkpoint 文件指向那个 checkpoint.parquet 文件?
第二种选择会更容易。
但是,如果我只是删除文件,并且不更改文件中的任何内容,将不会有任何负面影响 _delta_log ,那就最简单了。

jaql4c8m

jaql4c8m1#

tldr公司。回答这个问题。
如果我手动删除了底层的parquet文件,并且没有添加新的json日志文件或添加新的.checkpoint.parquet文件,并且更改指向它的\u delta\u log/\u last\u checkpoint文件;对deltalake表有什么负面影响(如果有的话)?
是的,这可能会损坏增量表。
让我简单回答一下delta lake如何使用 _delta_log .
如果你想看这个版本 x 然后它将转到所有版本的delta日志 1x-1 并将创建一个Parquet文件的运行总和来读取。此过程的摘要另存为 .checkpoint 以后每10个版本,使这个过程的运行效率总和。

我说的这个流水账是什么意思?

假设,
版本1日志显示,添加 add file_1, file_2, file_3 第二版日志说,添加 delete file_1, file_2, and add file_4 因此,当阅读第2版时,总指令将是 add file_1, file_2, file_3 -> delete file_1, file_2, and add file_4 因此,读取的结果文件将是文件3和文件4。

如果从文件系统中删除Parquet呢?

比如在版本3中,你删除了 file_4 从文件系统。如果你不使用 .vacuum 那么delta log就不会知道了 file_4 如果不存在,它将尝试读取它并将失败。

相关问题