设置:
Azure Event Hub -> raw delta table -> agg1 delta table -> agg2 delta table
通过Spark结构流处理数据。
使用merge
通过foreachBatch
完成对目标增量表的更新。
在结果中我得到错误:
异常错误:检测到数据更新(例如partKey=ap-2/part-00000- 2ddcc 5 bf-a475 -4606- 82 fc-e37019793 b5 a. c 000.snappy.parquet)。当前不支持此操作。如果希望忽略更新,请将选项“ignoreChanges”设置为“true”。如果希望反映数据更新,请使用新检查点目录重新启动此查询。
基本上,我无法通过任何流读取agg 1 delta表。如果我将最后一个流从delta切换到内存,我会收到相同的错误消息。使用第一个流,我没有任何问题。
注意事项。
1.在聚合之间,我改变了粒度:agg 1增量表(截断日期到分钟)、agg 2增量表(截断日期到天)。
1.如果我关闭所有其他流,最后一个仍然无法工作
- agg 2增量表是没有数据的新表
1条答案
按热度按时间hgb9j2n61#
数据流如何在来源数据表上运作:它读取属于源表的文件。它不能处理这些文件中的更改(更新、删除)。如果发生类似的情况,您将得到上面的错误。换句话说,DDL操作修改基础文件。唯一的区别是INSERT。如果没有不同的配置,新数据将到达新文件。
要解决此问题,您需要设置一个选项:ignoreChanges为True。此选项将导致您从修改的文件中获取所有记录。因此,您将再次获取与之前相同的记录以及修改的此记录。
问题在于:我们有聚合,聚合值存储在检查点中。如果我们再次获得相同的记录(未修改),我们会将其识别为更新,并增加其分组键的聚合。
解决方案:我们不能读取agg表来进行另一个聚合。我们需要读取原始表。
参考:https://docs.databricks.com/structured-streaming/delta-lake.html#ignore-updates-and-deletes
注意:我正在使用Databricks Runtime 10.4,所以我默认使用新的shuffle merge。