我正在通过spark结构化流向delta lake写入流。每个流式处理批包含键值(也包含一列时间戳)。delta-lake不支持在源位置使用多个相同的键进行更新(蒸汽批处理),所以我只想使用带有最新时间戳的记录来更新delta-lake。我该怎么做?
这是我正在尝试的代码片段:
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
println(s"Executing batch $batchId ...")
microBatchOutputDF.show()
deltaTable.as("t")
.merge(
microBatchOutputDF.as("s"),
"s.key = t.key")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
}
提前谢谢。
2条答案
按热度按时间krugob8w1#
您可以从“microbatchoutputdf”Dataframe中删除具有较旧时间戳的记录&只保留具有给定键的最新时间戳的记录。
您可以使用spark的“reducebykey”操作并实现如下自定义reduce函数。
假定键的类型为string×tamp类型为timestamp。为您的流式处理“microbatchoutputdf”调用“getlatestevents”。它忽略旧的时间戳事件,只保留最新的事件。
然后在“latestrecordsdf”上调用deltalake合并操作
14ifxucb2#
在微博客的流媒体中,对于给定的密钥,可能会有多条记录。为了用target表更新它,您必须找出microbatch中键的最新记录。在您的例子中,您可以使用max of timestamp列和value列来查找最新的记录,并将其用于合并操作。
有关查找给定密钥的最新记录的详细信息,请参阅此链接。