我正在使用foreachbatch将流数据写入多个目标,它在第一次微批处理执行时工作良好。当它尝试运行第二个微批处理时,它会失败并出现以下错误。“StreamingQueryException:查询[id = 0 d8 e45 ff-4f 3a-42 c 0 - 964 d-6 f41 c93 df 801,runId = 186 a22 bf-c75 e-482 b-bd 4 b-19 b 039 a9 aaa 38]终止,异常:abfss:xxxx@xxxxxxxxxx.dfs.core.windows.net/primary/directory1已经存在”
下面是我使用的foreach片段。
df_new = <<<some streaming dataset>>>
val appId = "1dbcd4f2-eeb7-11ed-a05b-0242ac120003"
df_new.writeStream.format("delta")
.option("mergeSchema", "true").outputMode("append")
.option("checkpointLocation", "abfss://xxx@xxxxxxxxxx.dfs.core.windows.net/checkpoint/chkdir")
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.persist()
val fc_final= batchDF.filter(col("msg_type") === "FC" )
.drop(columnlist_fc:_*)
fc_final.write
.option("txnVersion", batchId).option("txnAppId", appId)
.save("abfss://xxxx@xxxxxxxxxx.dfs.core.windows.net/primary/directory1")
val hb_final = batchDF.filter(col("msg_type") =!= "FC" )
.drop(columnlist_hb:_*)
hb_final.write.partitionBy("occurrence_month")
.option("txnVersion", batchId).option("txnAppId", appId)
.save("abfss://xxx@xxxxxxxxxx.dfs.core.windows.net/primary/directory2")
batchDF.unpersist()
()
}.start().awaitTermination()
我错过了什么?为什么即使我指定了mode=append,它也不能将数据文件附加到delta目录。非常感谢你的帮助。
1条答案
按热度按时间c7rzv4ha1#
问题是,在
.foreachBatch
中的两个.write
中,你没有指定保存模式,也就是说,批量写入是SaveMode.ErrorIfExists
,这意味着如果数据存在,抛出一个错误。如果要追加数据,需要将其更改为SaveMode.Append
: