我有2个 Dataframe ,并在列partition_column
上进行分区,我在加入 Dataframe 时观察到以下2种方法之间的性能差异。
假设2个 Dataframe 是df1和df2,并且两者都在partition_column
上分区。
1.使用Spark并行连接 Dataframe 。
val writeDF = df1.as("df1").join(df2.as("df2"),
col("df1.partition_column").equalTo("df2.partition_column")
and col("df1.unique_id").equalTo(col("df2.unique_id"))).select("df1.*")
writeDF.write.mode(SaveMode.Overwrite).partitionBy("partition_column").parquet("some location")
我检查了Spark计划,它使用分区过滤器,但这项工作慢相比,下面的方法。
1.通过在组中提交分区来加入 Dataframe 。
val partitions = df2.select("partition_column").distinct.collect() //i have maximum 500 partitions
partitions.grouped(5).foreach(batches=>{
batches.par.foreach(row=>{
val partitionKey = row.getAs[Double]("partition_column")
val writeDF = df1.as("df1").join(df2.as("df2"),
col("df1.partition_column").equalTo(partitionKey)
and col("df2.partition_column").equalTo(partitionKey)
and col("df1.unique_id").equalTo(col("df2.unique_id"))).select("df1.*")
writeDF.write.mode(SaveMode.Overwrite).parquet(s"somebase_location/partition_column=${partitionKey}/")
})
})
这种方法在他们的计划中也使用了分区过滤器。
我观察到的一件事是资源利用率,在方法1中,资源利用率是30%,但在方法2中,它大约是75%。
我的问题是:
1.为什么2种方法比1种方法快?
1.因为我需要从1个 Dataframe 中的数据,我应该去过滤的方法。
1条答案
按热度按时间xwbd5t1u1#
我不知道你真正的问题是什么,但是第二种方法会占用更多的内存,这是有道理的,它基本上做了大约500次几乎相同的连接。让我对这两个查询感到好奇的一件事是,当我们只需要一个表的列时,就不需要连接了,你可能也想试试这个:
或者如果你想使用spark API,我认为与
EXISTS
最相似的API是left semi join: