我有一个笔记本,它使用类似于以下语句的语句来编写增量表:
match = "current.country = updates.country and current.process_date = updates.process_date"
deltaTable = DeltaTable.forPath(spark, silver_path)
deltaTable.alias("current")\
.merge(
data.alias("updates"),
match) \
.whenMatchedUpdate(
set = update_set,
condition = condition) \
.whenNotMatchedInsert(values = values_set)\
.execute()
多任务作业具有两个并行执行的任务。x1c 0d1x
执行作业时,显示以下错误:
**ConcurrentAppendException:**文件被并发更新添加到分区[country=帕纳马,process_date=2022-01-01 00:00:00]。请重试该操作。
在每个任务中,我发送了不同的国家(巴拿马、厄瓜多尔)和相同的日期作为参数,因此在执行时,只需要写入与所发送的国家相对应的信息。该delta表由country和process_date字段进行分区。您知道我做错了什么吗?在使用“merge”语句时,我应该如何指定受影响的分区?
如果您能说明在这些情况下我应该如何使用分区,我将不胜感激,因为这对我来说是新的。
**更新:**我在条件中进行了调整,以根据此处指示的内容指定国家/地区和处理日期(ConcurrentAppendException)。现在我收到以下错误消息:
**ConcurrentAppendException:**并发更新已将文件添加到表的根目录。请重试该操作。
我想不出是什么原因导致错误。继续调查。
2条答案
按热度按时间yhxst69z1#
错误-并发追加异常:文件已通过并发更新添加到表的根目录。请重试该操作。
此异常错误通常在并发DELETE、UPDATE或MERGE操作期间引发。虽然并发操作可能物理更新不同的分区目录,但其中一个操作可能读取另一个操作并发更新的同一分区,从而导致冲突。通过在操作条件中明确分隔,可以避免这种情况。
在Map中使用'Update Strategy'转换时,将对Delta Lake目标表执行更新查询。对同一个目标表使用多个Update Strategy转换时,将并行执行多个UPDATE查询,因此,目标数据将不可预测。由于并发UPDATE查询的Delta Lake目标中存在不可预测的数据方案,**不支持在Map中对每个“数据块Delta Lake表”使用多个“更新策略”转换。**请重新设计Map,使每个Delta Lake表都有一个“更新策略”转换。
解决方案-
在对每个Databricks Delta Lake表使用一个“更新策略”转换运行Map时,执行将成功完成。
请参阅-https://docs.delta.io/latest/concurrency-control.html#avoid-conflicts-using-partitioning-and-disjoint-command-conditions
ktecyv1j2#
最初,受影响的表只有一个日期字段作为分区。所以我用国家和日期字段对其进行分区。这个新分区创建了国家和日期目录,但日期分区的旧目录仍然存在,没有被删除。
很显然,当试图同时读取这些目录时,这些目录导致了冲突。我在另一个路径上创建了一个新的增量,并使用正确的分区,然后将其替换到原始路径上。这允许旧的分区目录被删除。
执行这些操作的唯一后果是我丢失了表的更改历史(时间旅行)。