对于我的一个用例,我正在使用delta lake的更改数据馈送(CDF)功能,它与CDF配合良好,但当我读取所有数据以插入gold时,它列出了所有版本,有没有一种方法可以只读取最新版本而不指定版本号或获取最新版本的方法?
return spark.read.format("delta") \
.option("readChangeFeed", "true") \
.table(tableName) \
.where(col("_change_type") != "preimage")
上面的代码块返回自开始以来所有版本的结果,我可以通过查看表并指定版本来获取最新数据,但我不知道如何在生产中启用此功能,我不想使用时间戳来获取最新版本,因为在重试的情况下,有些人可能会一天多次运行管道,如果不作为当天的第一次运行处理,这将带来数据不准确。如果你能帮忙的话,我将不胜感激。
2条答案
按热度按时间vuv7lop31#
我们可以编写一个行级修改查询,以获取增量表的不同版本。
正如Tim在an answer to a similar Stack Overflow question中发布的那样,你可以像下面这样将它作为一个流来阅读:
cyvaqqii2#
从delta.tables导入DeltaTable
table_path =“/path/to/your/delta/table”deltaTable = DeltaTable.forPath(spark,table_path)
history_df = deltaTable.history().select(“version”).orderBy(“version”,ascending=False)version = history_df.collect()[0][0]
df = spark.read.format(“delta”).option(“versionAsOf”,version).load(delta_table_path)