scala 连接中的Spark性能差异

4c8rllxm  于 2023-06-23  发布在  Scala
关注(0)|答案(1)|浏览(190)

我有2个 Dataframe ,并在列partition_column上进行分区,我在加入 Dataframe 时观察到以下2种方法之间的性能差异。
假设2个 Dataframe 是df1和df2,并且两者都在partition_column上分区。
1.使用Spark并行连接 Dataframe 。

val writeDF = df1.as("df1").join(df2.as("df2"),
            col("df1.partition_column").equalTo("df2.partition_column")
            and col("df1.unique_id").equalTo(col("df2.unique_id"))).select("df1.*")
writeDF.write.mode(SaveMode.Overwrite).partitionBy("partition_column").parquet("some location")

我检查了Spark计划,它使用分区过滤器,但这项工作慢相比,下面的方法。
1.通过在组中提交分区来加入 Dataframe 。

val partitions = df2.select("partition_column").distinct.collect() //i have maximum 500 partitions
    partitions.grouped(5).foreach(batches=>{
        batches.par.foreach(row=>{
          val partitionKey = row.getAs[Double]("partition_column")
          val writeDF = df1.as("df1").join(df2.as("df2"),
            col("df1.partition_column").equalTo(partitionKey)
            and col("df2.partition_column").equalTo(partitionKey)
            and col("df1.unique_id").equalTo(col("df2.unique_id"))).select("df1.*")
writeDF.write.mode(SaveMode.Overwrite).parquet(s"somebase_location/partition_column=${partitionKey}/")
        })
    })

这种方法在他们的计划中也使用了分区过滤器。
我观察到的一件事是资源利用率,在方法1中,资源利用率是30%,但在方法2中,它大约是75%。
我的问题是:
1.为什么2种方法比1种方法快?
1.因为我需要从1个 Dataframe 中的数据,我应该去过滤的方法。

xwbd5t1u

xwbd5t1u1#

我不知道你真正的问题是什么,但是第二种方法会占用更多的内存,这是有道理的,它基本上做了大约500次几乎相同的连接。让我对这两个查询感到好奇的一件事是,当我们只需要一个表的列时,就不需要连接了,你可能也想试试这个:

df2.registerTempTable("df2")

val writeDF = df1.as("df1").where(
  expr("exists (select true from df2 where df2.partition_column = df1.partition_column and df2.unique_id = df1.unique_id)")
)

或者如果你想使用spark API,我认为与EXISTS最相似的API是left semi join:

df1.join(
  df2,
  df1("partition_column") === df2("partition_column") and df1("unique_id") === df2("unique_id"),
  "left_semi"
)

相关问题