需要一个优雅的方式回滚三角洲湖到以前的版本。
我目前的方法如下所示:
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, testFolder)
spark.read.format("delta")
.option("versionAsOf", 0)
.load(testFolder)
.write
.mode("overwrite")
.format("delta")
.save(testFolder)
这是丑陋的,因为整个数据集需要重写。似乎一些 meta更新将是足够的,没有数据I/O应该是必要的。有人知道一个更好的方法吗?
5条答案
按热度按时间qybjjes11#
从Delta Lake 0.7.0开始,您可以使用RESTORE命令回退到Delta Lake表的早期版本。这是使用时间旅行回退表的一种简单得多的方法。
Scala语言:
巨蟒
SQL:
第一个月
如果您更喜欢这样做,也可以使用
restoreToTimestamp
命令。阅读文档了解更多详细信息。rmbxnbpk2#
这里有一个很残酷的解决方案,虽然不是很理想,但是考虑到用分区覆盖一个大数据集的代价可能很高,这个简单的解决方案可能会有帮助。
如果您对所需回滚时间之后的更新不太敏感,只需删除_delta_log中所有晚于回滚时间的版本文件。未引用的文件可以稍后使用vacuum释放。
另一个保存完整历史记录的解决方案是:1)
deltaTable.delete
2)将回滚之前的所有日志按顺序(版本号递增)复制到删除日志文件的末尾,这模拟了创建delta湖直到回滚日期,但肯定不是很漂亮。yzxexxkh3#
如果您的目标是修复错误的数据,并且您对更新不是很敏感,则可以替换时间间隔。
nue99wik4#
我在Delta中遇到过类似的问题,我在一个事务中调用了多个dml操作。例如,我需要在一个事务中调用merge,然后再调用delete。因此,在这种情况下,它们必须同时成功,或者在其中任何一个失败时回滚状态。
为了解决这个问题,我备份了_delta_log(我们称之为稳定状态)目录。如果事务中的两个DML操作都成功,则丢弃以前的稳定状态,并使用_delta_log中提交的新状态,如果任何dml操作失败,则只需替换_delta_日志目录中,该目录具有您在启动事务处理之前备份的稳定状态。一旦替换为稳定状态,则只需运行vacuum以删除事务处理期间可能创建的过时文件。
bfhwhh0e5#
您应该使用时间旅行功能:https://databricks.com/blog/2019/02/04/introducing-delta-time-travel-for-large-scale-data-lakes.html
读取时间戳处的数据:
然后用“回滚”版本覆盖现有数据。
至于它的丑陋,我不确定我能帮上什么忙。你可以使用分区来限制数据。或者你可以找出哪些记录已经改变,只覆盖它们。