当两个表的连接以相同的方式被分桶和排序时,为什么Spark会重新排序数据?

f0brbegy  于 2023-11-21  发布在  Apache
关注(0)|答案(1)|浏览(143)

我在做两个表的简单连接。

SELECT
    a.user_id
FROM
    table1 a
    FULL OUTER JOIN
    table2 b
        ON a.user_id = b.user_id

字符串
table1table2都是按user_id进行分桶和排序的,具有相同的桶数,代码如下。

df.write
    .mode("overwrite")
    .format("parquet")
    .bucketBy(4, "user_id")
    .sortBy("user_id")
    .option("path", "some/path/to/data/")
    .option("compression", "snappy")
    .saveAsTable("table1")


当我查看执行计划时,我发现Spark在FileScan之后仍然执行Sort步骤,我认为这是不应该的。

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [user_id#4483L]
   +- SortMergeJoin [user_id#4483L], [user_id#4485L], FullOuter
      :- Sort [user_id#4483L ASC NULLS FIRST], false, 0
      :  +- FileScan parquet default.table1[user_id#4483L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3://example/path_to_table1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<user_id:bigint>, SelectedBucketsCount: 4 out of 4
      +- Sort [user_id#4485L ASC NULLS FIRST], false, 0
         +- FileScan parquet default.table2[user_id#4485L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3://example/path_to_table2], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<user_id:bigint>, SelectedBucketsCount: 4 out of 4


以前,如果我不对表进行存储桶和排序,执行计划也会包含一个Exchange步骤,因此,通过存储桶,它现在消除了Exchange步骤,这当然是好的,但我希望也能消除Sort步骤。
谢谢.

**回答:**摘自Pradeep Yadav的回答

我需要在saveAsTable()之前对数据重新分区。

df.write
    .repartition(4, col("user_id"))
    .mode("overwrite")
    .format("parquet")
    .bucketBy(4, "user_id")
    .sortBy("user_id")
    .option("path", "some/path/to/data/")
    .option("compression", "snappy")
    .saveAsTable("table1")

ws51t4hk

ws51t4hk1#

你可以阅读这篇详细的文章:https://towardsdatascience.com/best-practices-for-bucketing-in-spark-sql-ea9f23f7dd53特别是部分**什么是排序?**它应该能够回答你的查询。

相关问题