Apache Spark 插入排序的冰山表时避免乱序

lzfw57am  于 2023-01-02  发布在  Apache
关注(0)|答案(1)|浏览(212)

我有一个冰山表创建

CREATE TABLE catalog.db.table (a int, b int) USING iceberg

然后对它应用某种排序顺序

ALTER TABLE catalog.db.table WRITE ORDERED BY (a, b)

调用最后一个命令后,SHOW TBLPROPERTIES catalog.db.table开始显示write.distribution-mode: range属性:

|sort-order             |a ASC NULLS FIRST, b ASC NULLS FIRST|
|write.distribution-mode|range                               |

现在我正在向表中写入数据:

df = spark.createDataFrame([(i, i*4) for i in range(100000)], ["a", "b"]).coalesce(1).sortWithinPartitions("a", "b")
df.writeTo("datalakelocal.ixanezis.table").append()

我认为这应该在spark中创建一个单一的任务,该任务将对dataframe中的所有数据进行排序(事实上,从创建开始就已经排序了),然后将其作为一个单一的文件插入到表中。
不幸的是,在写这篇文章的时候,spark决定重新划分所有导致 Shuffle 的数据,我相信这是由于write.distribution-mode: range自动设置的。

== Physical Plan ==
AppendData (6)
+- * Sort (5)
   +- Exchange (4)    # :(
      +- * Project (3)
         +- Coalesce (2)
            +- * Scan ExistingRDD (1)

是否有一种方法既可以插入新数据,又可以避免不必要的乱序?

pgvzfuti

pgvzfuti1#

根据Apache Iceberg文档,WRITE ORDERED BY执行以下操作:
Iceberg表可以配置一个排序顺序,用于自动排序某些引擎中写入表中的数据。例如,Spark中的MERGE INTO将使用表排序。
现在,您使用以下内容创建并编写表:

df = spark.createDataFrame([(i, i*4) for i in range(100000)], ["a", "b"]).coalesce(1).sortWithinPartitions("a", "b")
df.writeTo("datalakelocal.ixanezis.table").append()

对 Dataframe 排序需要一个shuffle操作。你已经使用了sortWithinPartitions,它确实对你的数据进行了排序,但是只在你的分区内。所以这并没有像冰山表所要求的那样完成完整的排序操作。
因此,您需要另一个全混洗操作来完成完整的排序。

相关问题