azure 如何在foreachBatch中向delta表追加记录?

utugiqy6  于 2023-05-18  发布在  其他
关注(0)|答案(1)|浏览(100)

我正在使用foreachbatch将流数据写入多个目标,它在第一次微批处理执行时工作良好。当它尝试运行第二个微批处理时,它会失败并出现以下错误。“StreamingQueryException:查询[id = 0 d8 e45 ff-4f 3a-42 c 0 - 964 d-6 f41 c93 df 801,runId = 186 a22 bf-c75 e-482 b-bd 4 b-19 b 039 a9 aaa 38]终止,异常:abfss:xxxx@xxxxxxxxxx.dfs.core.windows.net/primary/directory1已经存在”
下面是我使用的foreach片段。

df_new = <<<some streaming dataset>>>
  
val appId = "1dbcd4f2-eeb7-11ed-a05b-0242ac120003" 
    
df_new.writeStream.format("delta")
  .option("mergeSchema", "true").outputMode("append")
  .option("checkpointLocation", "abfss://xxx@xxxxxxxxxx.dfs.core.windows.net/checkpoint/chkdir")
  .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
      batchDF.persist()
      val fc_final= batchDF.filter(col("msg_type") === "FC" )
        .drop(columnlist_fc:_*)
      fc_final.write
       .option("txnVersion", batchId).option("txnAppId", appId)
       .save("abfss://xxxx@xxxxxxxxxx.dfs.core.windows.net/primary/directory1")
    
      val hb_final = batchDF.filter(col("msg_type") =!= "FC" )
        .drop(columnlist_hb:_*)
    
      hb_final.write.partitionBy("occurrence_month")
        .option("txnVersion", batchId).option("txnAppId", appId)
        .save("abfss://xxx@xxxxxxxxxx.dfs.core.windows.net/primary/directory2")
      
      batchDF.unpersist()
    ()
    
    }.start().awaitTermination()

我错过了什么?为什么即使我指定了mode=append,它也不能将数据文件附加到delta目录。非常感谢你的帮助。

c7rzv4ha

c7rzv4ha1#

问题是,在.foreachBatch中的两个.write中,你没有指定保存模式,也就是说,批量写入是SaveMode.ErrorIfExists,这意味着如果数据存在,抛出一个错误。如果要追加数据,需要将其更改为SaveMode.Append

fc_final.write
       .mode("append")
       .option("txnVersion", batchId).option("txnAppId", appId)
       .save("abfss://xxxx@xxxxxxxxxx.dfs.core.windows.net/primary/directory1")

相关问题