stream对delta-lake具有多个相同的密钥

4nkexdtk  于 2021-05-29  发布在  Spark
关注(0)|答案(2)|浏览(439)

我正在通过spark结构化流向delta lake写入流。每个流式处理批包含键值(也包含一列时间戳)。delta-lake不支持在源位置使用多个相同的键进行更新(蒸汽批处理),所以我只想使用带有最新时间戳的记录来更新delta-lake。我该怎么做?
这是我正在尝试的代码片段:

def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {

  println(s"Executing batch $batchId ...")
  microBatchOutputDF.show()

  deltaTable.as("t")
    .merge(
      microBatchOutputDF.as("s"),
      "s.key = t.key")
    .whenMatched().updateAll()
    .whenNotMatched().insertAll()
    .execute()
}

提前谢谢。

krugob8w

krugob8w1#

您可以从“microbatchoutputdf”Dataframe中删除具有较旧时间戳的记录&只保留具有给定键的最新时间戳的记录。
您可以使用spark的“reducebykey”操作并实现如下自定义reduce函数。

def getLatestEvents(input: DataFrame) : RDD[Row] = {
input.rdd.map(x => (x.getAs[String]("key"), x)).reduceByKey(reduceFun).map(_._2) }

def reduceFun(x: Row, y: Row) : Row = {
if (x.getAs[Timestamp]("timestamp").getTime > y.getAs[Timestamp]("timestamp").getTime) x else y }

假定键的类型为string&timestamp类型为timestamp。为您的流式处理“microbatchoutputdf”调用“getlatestevents”。它忽略旧的时间戳事件,只保留最新的事件。

val latestRecordsDF = spark.createDataFrame(getLatestEvents(microBatchOutputDF), <schema of DF>)

然后在“latestrecordsdf”上调用deltalake合并操作

14ifxucb

14ifxucb2#

在微博客的流媒体中,对于给定的密钥,可能会有多条记录。为了用target表更新它,您必须找出microbatch中键的最新记录。在您的例子中,您可以使用max of timestamp列和value列来查找最新的记录,并将其用于合并操作。
有关查找给定密钥的最新记录的详细信息,请参阅此链接。

相关问题