我在做两个表的简单连接。
SELECT
a.user_id
FROM
table1 a
FULL OUTER JOIN
table2 b
ON a.user_id = b.user_id
字符串table1
和table2
都是按user_id
进行分桶和排序的,具有相同的桶数,代码如下。
df.write
.mode("overwrite")
.format("parquet")
.bucketBy(4, "user_id")
.sortBy("user_id")
.option("path", "some/path/to/data/")
.option("compression", "snappy")
.saveAsTable("table1")
型
当我查看执行计划时,我发现Spark在FileScan
之后仍然执行Sort
步骤,我认为这是不应该的。
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [user_id#4483L]
+- SortMergeJoin [user_id#4483L], [user_id#4485L], FullOuter
:- Sort [user_id#4483L ASC NULLS FIRST], false, 0
: +- FileScan parquet default.table1[user_id#4483L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3://example/path_to_table1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<user_id:bigint>, SelectedBucketsCount: 4 out of 4
+- Sort [user_id#4485L ASC NULLS FIRST], false, 0
+- FileScan parquet default.table2[user_id#4485L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3://example/path_to_table2], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<user_id:bigint>, SelectedBucketsCount: 4 out of 4
型
以前,如果我不对表进行存储桶和排序,执行计划也会包含一个Exchange
步骤,因此,通过存储桶,它现在消除了Exchange
步骤,这当然是好的,但我希望也能消除Sort
步骤。
谢谢.
**回答:**摘自Pradeep Yadav的回答
我需要在saveAsTable()
之前对数据重新分区。
df.write
.repartition(4, col("user_id"))
.mode("overwrite")
.format("parquet")
.bucketBy(4, "user_id")
.sortBy("user_id")
.option("path", "some/path/to/data/")
.option("compression", "snappy")
.saveAsTable("table1")
型
1条答案
按热度按时间ws51t4hk1#
你可以阅读这篇详细的文章:https://towardsdatascience.com/best-practices-for-bucketing-in-spark-sql-ea9f23f7dd53特别是部分**什么是排序?**它应该能够回答你的查询。