在Spark查询中比较物理计划与PushedFilters和PartitionFilters

fjnneemd  于 2023-03-03  发布在  Apache
关注(0)|答案(1)|浏览(185)

我已经想到了三种稍微不同的方法来查询Spark表,以过滤日期范围的数据(我主要使用它来查询一天的数据)。当我回顾这三种方法的物理计划时,它们略有不同,我不确定哪一个是最好的(在计算效率和Spark内存使用方面)。我希望一些Spark的StackOverflowMaven能够提供建议
1.哪一个(基于内部连接的过滤器、基于常规列的过滤器或基于分区列的过滤器)是最好的
1.如何在Spark中对这样的测试/方法进行基准测试(例如,在Python中,我会使用timeit并运行100-1000次相同的代码来测量时间性能。我想知道如何在Spark中进行基准测试(因为每次迭代都将是昂贵和漫长的)。
我在下面的代码中简化并添加了很多注解,以解释我正在尝试测试的内容。提前感谢您的回答/建议!

// 'calendar' table used below has just one column 'calendar_day' which has dates spanning from Jan 1, 1970 to Dec 31, 2050
// For fair comparison, we will select only one day from the calendar table like below
val start_day = LocalDate.parse("2023-01-07", DateTimeFormatter.ofPattern("yyyy-MM-dd"))
val end_day = LocalDate.parse("2023-01-07", DateTimeFormatter.ofPattern("yyyy-MM-dd"))
val start_day_literal = lit(Timestamp.valueOf(start_day.atStartOfDay())).cast("timestamp")
val end_day_literal = lit(Timestamp.valueOf(end_day.atStartOfDay())).cast("timestamp")
val t_dates = calendar 
.filter(col("calendar_day") >= start_day_literal)
.filter(col("calendar_day") <= end_day_literal)
.select("calendar_day")
t_dates.show(10, false);
+-------------------+
|calendar_day       |
+-------------------+
|2023-01-07 00:00:00|
+-------------------+

// This is our test#1
// Here, we'll use the broadcast join with t_dates.calendar_day 
// to filter out the rows from the data table.
val test_one = spark
.table("my_data_table")
.withColumn("customer_id", col("beneficiary_external_id").cast("long"))
.withColumn("start_date_trunc", col("start_date").cast("date"))
.withColumn("end_date_nvl", expr("nvl(cast(end_date as date), to_timestamp('2050-12-31', 'yyyy-MM-dd'))"))
.join(
    broadcast(t_dates).as("td"),
    joinExprs = col("calendar_day").between(
        lowerBound = col("start_date_trunc"),
        upperBound = col("end_date_nvl")
    ),
    joinType = "inner"
)
.select(
    col("customer_id"),
    col("start_date"),
    col("start_date_trunc"),
    col("start_day"),
    col("end_date"),
    col("end_date_nvl")
)
//test_one.show(100, false)
test_one.explain()
test_one.count(); // 1,999,005,179 rows

== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=false)
+- Project [customer_id#163L, start_date#114, start_date_trunc#182, start_day#128, end_date#115, end_date_nvl#202]
   +- BroadcastNestedLoopJoin BuildRight, Inner, ((calendar_day#0 >= cast(start_date_trunc#182 as timestamp)) && (calendar_day#0 <= end_date_nvl#202))
      :- Project [start_date#114, end_date#115, start_day#128, cast(beneficiary_external_id#113 as bigint) AS customer_id#163L, cast(start_date#114 as date) AS start_date_trunc#182, coalesce(cast(cast(end_date#115 as date) as timestamp), 2556057600000000) AS end_date_nvl#202]
      :  +- Filter isnotnull(cast(start_date#114 as date))
      :     +- FileScan parquet my_data_table[beneficiary_external_id#113,start_date#114,end_date#115,start_day#128] Batched: true, Format: Parquet, Location: CatalogFileIndex[s3://aac161b3-4383-880d-29a9-bd3d8e0115d0/9496ff37-0aaf-4b35-a0bc-5363a9213205], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<beneficiary_external_id:string,start_date:timestamp,end_date:timestamp>
      +- BroadcastExchange IdentityBroadcastMode
         +- Filter ((calendar_day#0 >= 1673049600000000) && (calendar_day#0 <= 1673049600000000))
            +- FileScan parquet [calendar_day#0] Batched: true, Format: Parquet, Location: InMemoryFileIndex[s3://my-bucket/calendar_days/], PartitionFilters: [], PushedFilters: [GreaterThanOrEqual(calendar_day,2023-01-07 00:00:00.0), LessThanOrEqual(calendar_day,2023-01-07 ..., ReadSchema: struct<calendar_day:timestamp>
test_one: org.apache.spark.sql.DataFrame = [customer_id: bigint, start_date: timestamp ... 4 more fields]
res4: Long = 1999005179


// For test_two, we will create a (mock) timestamp object for Jan 7, 2023, 
// which is the same date that we used for the test_one above.
// This is because we don't want to use inner join with t_dates table.
// Instead, we will replace inner join with filters as shown below.
val dataset_timestamp = Timestamp.valueOf("2023-01-07 00:00:00.0")

val test_two = spark
.table("my_data_table")
.withColumn("customer_id", col("beneficiary_external_id").cast("long"))
.withColumn("start_date_trunc", col("start_date").cast("date"))
.withColumn("end_date_nvl", expr("nvl(cast(end_date as date), to_timestamp('2050-12-31', 'yyyy-MM-dd'))"))
.filter(
    col("start_date_trunc") <= dataset_timestamp
    )
.filter(
    col("end_date_nvl") >= dataset_timestamp
    )
.select(
    col("customer_id"),
    col("start_date"),
    col("start_date_trunc"),
    col("start_day"),
    col("end_date"),
    col("end_date_nvl")
)
//test_two.show(100, false)
test_two.explain()
test_two.count(); // 1,999,005,179 rows; same as test_one

== Physical Plan ==
*(1) Project [cast(beneficiary_external_id#1 as bigint) AS customer_id#51L, start_date#2, cast(start_date#2 as date) AS start_date_trunc#70, start_day#16, end_date#3, coalesce(cast(cast(end_date#3 as date) as timestamp), 2556057600000000) AS end_date_nvl#90]
+- *(1) Filter ((isnotnull(start_date#2) && (cast(cast(start_date#2 as date) as timestamp) <= 1673049600000000)) && (coalesce(cast(cast(end_date#3 as date) as timestamp), 2556057600000000) >= 1673049600000000))
   +- *(1) FileScan parquet my_data_table[beneficiary_external_id#1,start_date#2,end_date#3,start_day#16] Batched: true, Format: Parquet, Location: CatalogFileIndex[s3://aac161b3-4383-880d-29a9-bd3d8e0115d0/9496ff37-0aaf-4b35-a0bc-5363a9213205], PartitionFilters: [], PushedFilters: [IsNotNull(start_date)], ReadSchema: struct<beneficiary_external_id:string,start_date:timestamp,end_date:timestamp>
dataset_timestamp: java.sql.Timestamp = 2023-01-07 00:00:00.0
test_two: org.apache.spark.sql.DataFrame = [customer_id: bigint, start_date: timestamp ... 4 more fields]
res1: Long = 1999005179


// Like test_two, we will create a (mock) timestamp object for Jan 7, 2023 for test_three.
// The  only difference between test_two and test_three is that we are now using start_day', 
// which is a partition column in the data source, in one of the filter statements.
val dataset_timestamp = Timestamp.valueOf("2023-01-07 00:00:00.0")

val test_three = spark
.table("my_data_table")
.withColumn("customer_id", col("beneficiary_external_id").cast("long"))
.withColumn("start_date_trunc", col("start_date").cast("date"))
.withColumn("end_date_nvl", expr("nvl(cast(end_date as date), to_timestamp('2050-12-31', 'yyyy-MM-dd'))"))
.filter(
    col("start_day") <= dataset_timestamp // This is the only difference between test_two and test_three
    )
.filter(
    col("end_date_nvl") >= dataset_timestamp
    )
.select(
    col("customer_id"),
    col("start_date"),
    col("start_date_trunc"),
    col("start_day"),
    col("end_date"),
    col("end_date_nvl")
)
//test_three.show(100, false)
test_three.explain()
test_three.count(); // 1,999,005,179 rows; same as test_one and test_two

== Physical Plan ==
*(1) Project [cast(beneficiary_external_id#130 as bigint) AS customer_id#163L, start_date#131, cast(start_date#131 as date) AS start_date_trunc#182, start_day#145, end_date#132, coalesce(cast(cast(end_date#132 as date) as timestamp), 2556057600000000) AS end_date_nvl#202]
+- *(1) Filter (coalesce(cast(cast(end_date#132 as date) as timestamp), 2556057600000000) >= 1673049600000000)
   +- *(1) FileScan parquet my_data_table[beneficiary_external_id#130,start_date#131,end_date#132,start_day#145] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[s3://aac161b3-4383-880d-29a9-bd3d8e0115d0/9496ff37-0aaf-4b35-a0bc-5363..., PartitionFilters: [(start_day#145 <= 1673049600000000)], PushedFilters: [], ReadSchema: struct<beneficiary_external_id:string,start_date:timestamp,end_date:timestamp>
dataset_timestamp: java.sql.Timestamp = 2023-01-07 00:00:00.0
test_three: org.apache.spark.sql.DataFrame = [customer_id: bigint, start_date: timestamp ... 4 more fields]
res2: Long = 1999005179
0pizxfdo

0pizxfdo1#

我认为最好的办法是,按顺序:三、二、一
方法3:正如你在执行计划中所看到的,你的过滤器被推到了分区,这意味着执行器不会加载不符合过滤器的数据/文件。这是Spark中最好的做法。请尽快尝试在分区列上过滤。
方法2:像approche 3,但没有分区过滤器。执行器将加载数据,这将是过滤后。
方法1:有趣的是,没有分区过滤器。你使用一个broadcast join,它不会引起shuffle,这是一个很好的观点。但是我看不出你如何能击败方法3,因为spark无论如何都必须执行一个比较。
不要使用基于代码的解决方案来比较执行时间。相反,看看SparkUI执行时间,并确保在比较之前,您的资源管理器(Yarn,k8s,...)给你相同数量的资源。

相关问题