Apache Spark 如何在合并时手动将时间戳应用于增量表版本?

bjp0bcyl  于 2022-11-16  发布在  Apache
关注(0)|答案(1)|浏览(107)

我正在使用以下内容不断更新数据块中的增量表:

delta_table = DeltaTable.forPath(spark, delta_path)
delta_table.alias("t").merge(
  df.alias("s"), ... # join condition
).whenMatchedUpdate(
  set=... # update columns
).whenNotMatchedInsert(
  values=... # insert columns
).execute()

稍后,我想使用时间戳查询表的不同版本。例如:

df = spark.read.format(
  "delta"
).option(
  "timestampAsOf",
  "2022-11-14 12:00:00",
).load(
  "s3://BUCKET/PREFIX/"
)

但是,这里的时间戳对应于合并的确切时间戳。(例如,如果是上周转储),是否有办法将时间戳设置为关联的时间戳,而不是将其设置为具体执行合并时的任何时间?如果要重新执行合并,这将特别有用。例如摄取数据等。

von4xj4u

von4xj4u1#

不知道有没有这样做的选择。但有其他可能相关的选择。
1.将用户定义的元数据添加到合并提交中,您可以在此处记录自己的时间戳。然后,您可以使用describe history查看此时间戳。从数据库文档中:

df.write.format("delta") \
  .mode("overwrite") \
  .option("userMetadata", "overwritten-for-fixing-incorrect-data") \
  .save("/tmp/delta/people10m")

1.只需创建一个日志/表,在其中记录表名、版本和所需的时间戳。
一个相关的提示,默认情况下,你可以时间旅行的能力,你的增量表只保证最多30天。这可以配置使用表属性delta.logRetentionDurationdelta.deletedFileRetentionDuration,真空操作(用于清理旧版本的表的 parquet 文件)也在这里发挥作用。请参阅docs

相关问题