我正在使用spark 3.x,我有以下简单的查询来学习spark SQL bucketing特性。
test("bucket join 1") {
val spark = SparkSession.builder().master("local").enableHiveSupport().appName("test join 1").config("spark.sql.codegen.wholeStage", "false").getOrCreate()
import spark.implicits._
val data1 = (0 to 100).map {
i => (i, ('A' + i % 6).asInstanceOf[Char].toString)
}
val t1 = "t_" + System.currentTimeMillis()
data1.toDF("a", "b").write.bucketBy(2, "b").saveAsTable(t1)
val data2 = (0 to 5).map {
i => (('A' + i % 6).asInstanceOf[Char].toString, ('A' + i % 6).asInstanceOf[Char].toString)
}
val t2 = "t_" + System.currentTimeMillis()
data2.toDF("a", "b").write.bucketBy(2, "b").saveAsTable(t2)
val df = spark.sql(
s"""
select t1.a ,t1.b,t2.a, t2.b from $t1 t1 join $t2 t2 on t1.b = t2.b
""".stripMargin(' '))
df.explain(true)
df.show(truncate = false)
spark.sql(s"describe extended $t1 ").show(truncate = false)
spark.sql(s"describe extended $t2 ").show(truncate = false)
}
字符串
当我运行上面的代码时,它打印出以下物理计划,它显然涉及到 Shuffle ,所以我认为桶在这里不起作用。
从describe command
的输出中,我看到两个表都定义了bucket spec:
Bucket Columns: b
Num Buckets: 2
型
不知道我错过了什么,我没有预期的结果( Shuffle 应避免)
BroadcastHashJoin [b#27], [b#29], Inner, BuildRight
:- Project [a#26, b#27]
: +- Filter isnotnull(b#27)
: +- FileScan parquet default.t_1700986792232[a#26,b#27] Batched: false, DataFilters: [isnotnull(b#27)], Format: Parquet, Location: InMemoryFileIndex[file:/D://spark-warehouse/t_17009867..., PartitionFilters: [], PushedFilters: [IsNotNull(b)], ReadSchema: struct<a:int,b:string>, SelectedBucketsCount: 2 out of 2
+- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, true])), [id=#35]
+- Project [a#28, b#29]
+- Filter isnotnull(b#29)
+- FileScan parquet default.t_1700986813106[a#28,b#29] Batched: false, DataFilters: [isnotnull(b#29)], Format: Parquet, Location: InMemoryFileIndex[file:/D:/spark-warehouse/t_17009868..., PartitionFilters: [], PushedFilters: [IsNotNull(b)], ReadSchema: struct<a:string,b:string>, SelectedBucketsCount: 2 out of 2
型
1条答案
按热度按时间41zrol4v1#
你对DF的数字导致Catalyst认为广播哈希连接是更好的方法。这是一种算法。
1.尝试以下
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
,使用Bucketing。1.对于bothDF's,尝试使用1M行,使用Bucketing。
正如你在下面看到的。
字符串
对于delta和ZOrder,您可能希望考虑delta格式。