scala 三角洲湖回退

irtuqstp  于 2022-12-23  发布在  Scala
关注(0)|答案(5)|浏览(127)

需要一个优雅的方式回滚三角洲湖到以前的版本。
我目前的方法如下所示:

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, testFolder)

spark.read.format("delta")
  .option("versionAsOf", 0)
  .load(testFolder)
  .write
  .mode("overwrite")
  .format("delta")
  .save(testFolder)

这是丑陋的,因为整个数据集需要重写。似乎一些 meta更新将是足够的,没有数据I/O应该是必要的。有人知道一个更好的方法吗?

qybjjes1

qybjjes11#

从Delta Lake 0.7.0开始,您可以使用RESTORE命令回退到Delta Lake表的早期版本。这是使用时间旅行回退表的一种简单得多的方法。

Scala语言:

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/path/to/delta-table")

deltaTable.restoreToVersion(0)

巨蟒

from delta.tables import *

deltaTable = DeltaTable.forPath(spark, "/path/to/delta-table")

deltaTable.restoreToVersion(0)

SQL

第一个月
如果您更喜欢这样做,也可以使用restoreToTimestamp命令。阅读文档了解更多详细信息。

rmbxnbpk

rmbxnbpk2#

这里有一个很残酷的解决方案,虽然不是很理想,但是考虑到用分区覆盖一个大数据集的代价可能很高,这个简单的解决方案可能会有帮助。
如果您对所需回滚时间之后的更新不太敏感,只需删除_delta_log中所有晚于回滚时间的版本文件。未引用的文件可以稍后使用vacuum释放。
另一个保存完整历史记录的解决方案是:1)deltaTable.delete 2)将回滚之前的所有日志按顺序(版本号递增)复制到删除日志文件的末尾,这模拟了创建delta湖直到回滚日期,但肯定不是很漂亮。

yzxexxkh

yzxexxkh3#

如果您的目标是修复错误的数据,并且您对更新不是很敏感,则可以替换时间间隔。

df.write
      .format("delta")
      .mode("overwrite")
      .option("replaceWhere", "date >= '2017-01-01' AND date <= '2017-01-31'")
      .save("/delta/events")
nue99wik

nue99wik4#

我在Delta中遇到过类似的问题,我在一个事务中调用了多个dml操作。例如,我需要在一个事务中调用merge,然后再调用delete。因此,在这种情况下,它们必须同时成功,或者在其中任何一个失败时回滚状态。
为了解决这个问题,我备份了_delta_log(我们称之为稳定状态)目录。如果事务中的两个DML操作都成功,则丢弃以前的稳定状态,并使用_delta_log中提交的新状态,如果任何dml操作失败,则只需替换_delta_日志目录中,该目录具有您在启动事务处理之前备份的稳定状态。一旦替换为稳定状态,则只需运行vacuum以删除事务处理期间可能创建的过时文件。

bfhwhh0e

bfhwhh0e5#

您应该使用时间旅行功能:https://databricks.com/blog/2019/02/04/introducing-delta-time-travel-for-large-scale-data-lakes.html
读取时间戳处的数据:

val inputPath = "/path/to/my/table@20190101000000000"

然后用“回滚”版本覆盖现有数据。
至于它的丑陋,我不确定我能帮上什么忙。你可以使用分区来限制数据。或者你可以找出哪些记录已经改变,只覆盖它们。

相关问题