我有一个冰山表创建
CREATE TABLE catalog.db.table (a int, b int) USING iceberg
然后对它应用某种排序顺序
ALTER TABLE catalog.db.table WRITE ORDERED BY (a, b)
调用最后一个命令后,SHOW TBLPROPERTIES catalog.db.table
开始显示write.distribution-mode: range
属性:
|sort-order |a ASC NULLS FIRST, b ASC NULLS FIRST|
|write.distribution-mode|range |
现在我正在向表中写入数据:
df = spark.createDataFrame([(i, i*4) for i in range(100000)], ["a", "b"]).coalesce(1).sortWithinPartitions("a", "b")
df.writeTo("datalakelocal.ixanezis.table").append()
我认为这应该在spark中创建一个单一的任务,该任务将对dataframe中的所有数据进行排序(事实上,从创建开始就已经排序了),然后将其作为一个单一的文件插入到表中。
不幸的是,在写这篇文章的时候,spark决定重新划分所有导致 Shuffle 的数据,我相信这是由于write.distribution-mode: range
自动设置的。
== Physical Plan ==
AppendData (6)
+- * Sort (5)
+- Exchange (4) # :(
+- * Project (3)
+- Coalesce (2)
+- * Scan ExistingRDD (1)
是否有一种方法既可以插入新数据,又可以避免不必要的乱序?
1条答案
按热度按时间pgvzfuti1#
根据Apache Iceberg文档,
WRITE ORDERED BY
执行以下操作:Iceberg表可以配置一个排序顺序,用于自动排序某些引擎中写入表中的数据。例如,Spark中的MERGE INTO将使用表排序。
现在,您使用以下内容创建并编写表:
对 Dataframe 排序需要一个shuffle操作。你已经使用了
sortWithinPartitions
,它确实对你的数据进行了排序,但是只在你的分区内。所以这并没有像冰山表所要求的那样完成完整的排序操作。因此,您需要另一个全混洗操作来完成完整的排序。