使用 .vacuum()
在deltalake上,表非常慢(参见emr和s3上的delta-lake(oss)表-没有作业时真空需要很长时间)。
如果我手动删除了底层Parquet文件而没有添加新的 json
日志文件或添加新的 .checkpoint.parquet
归档并更改 _delta_log/_last_checkpoint
指向它的文件;对deltalake表有什么负面影响(如果有的话)?
显然,时间旅行,即加载依赖于我删除的Parquet文件的表的早期版本,将不起作用。我想知道的是,在当前版本的deltalake表中读、写或附加什么问题?
我想在Pypark做什么:
### Assuming a working SparkSession as `spark`
from subprocess import check_output
import json
from pyspark.sql import functions as F
awscmd = "aws s3 cp s3://my_s3_bucket/delta/_delta_log/_last_checkpoint -"
last_checkpoint = str(json.loads(check_output(awscmd, shell=True).decode("utf-8")).get('version')).zfill(20)
s3_bucket_path = "s3a://my_s3_bucket/delta/"
df_chkpt_del = (
spark.read.format("parquet")
.load(f"{s3_bucket_path}/_delta_log/{last_checkpoint}.checkpoint.parquet")
.where(F.col("remove").isNotNull())
.select("remove.*")
.withColumn("deletionTimestamp", F.from_unixtime(F.col("deletionTimestamp")/1000))
.withColumn("delDateDiffDays", F.datediff(F.col("deletionTimestamp"), F.current_timestamp()))
.where(F.col("delDateDiffDays") < -7 )
)
这里有很多选择。其中之一可能是:
df_chkpt_del.select("path").toPandas().to_csv("files_to_delete.csv", index=False)
我可以阅读的地方 files_to_delete.csv
然后使用一个简单的bash for
循环将每个Parquet文件s3路径传递给 aws s3 rm
命令逐个删除文件。
这可能比 vacuum()
,但至少它在工作时不会消耗集群资源。
如果我这样做,我是否也必须:
写一个新的 _delta_log/000000000000000#####.json
正确记录这些更改的文件?
写一个新的 000000000000000#####.checkpoint.parquet
正确记录这些更改并更改 _delta_log/_last_checkpoint
文件指向那个 checkpoint.parquet
文件?
第二种选择会更容易。
但是,如果我只是删除文件,并且不更改文件中的任何内容,将不会有任何负面影响 _delta_log
,那就最简单了。
1条答案
按热度按时间jaql4c8m1#
tldr公司。回答这个问题。
如果我手动删除了底层的parquet文件,并且没有添加新的json日志文件或添加新的.checkpoint.parquet文件,并且更改指向它的\u delta\u log/\u last\u checkpoint文件;对deltalake表有什么负面影响(如果有的话)?
是的,这可能会损坏增量表。
让我简单回答一下delta lake如何使用
_delta_log
.如果你想看这个版本
x
然后它将转到所有版本的delta日志1
至x-1
并将创建一个Parquet文件的运行总和来读取。此过程的摘要另存为.checkpoint
以后每10个版本,使这个过程的运行效率总和。我说的这个流水账是什么意思?
假设,
版本1日志显示,添加
add file_1, file_2, file_3
第二版日志说,添加delete file_1, file_2, and add file_4
因此,当阅读第2版时,总指令将是add file_1, file_2, file_3 -> delete file_1, file_2, and add file_4
因此,读取的结果文件将是文件3和文件4。如果从文件系统中删除Parquet呢?
比如在版本3中,你删除了
file_4
从文件系统。如果你不使用.vacuum
那么delta log就不会知道了file_4
如果不存在,它将尝试读取它并将失败。