pyspark Spark AQE动态聚结后置过滤器无混洗

0h4hbjxa  于 2023-08-02  发布在  Spark
关注(0)|答案(2)|浏览(134)

我有以下功能:

def calculate_delta(old_df: DataFrame,
                    new_df: DataFrame,
                    id_keys: List[str],
                    deletion_field_name: str,
                    excluded_fields: List[str] = None) -> DataFrame:
    # make sure both the new_df and old_df have the same columns order
    if excluded_fields:
        old_df = old_df.drop(*excluded_fields)
    shared_columns = set(old_df.columns).intersection(new_df.columns)
    new_df = new_df.select(*id_keys, f.struct(*shared_columns).alias("new_struct"))
    old_df = old_df.select(*id_keys, f.struct(*shared_columns).alias("old_struct"))

    return new_df.join(old_df, on=id_keys, how="outer") \
        .filter("old_struct IS NULL OR new_struct IS NULL OR new_struct != old_struct") \
        .withColumn(deletion_field_name, f.expr("new_struct is null"))\
        .withColumn("output_struct", f.when(f.col("new_struct").isNull(), f.col("old_struct")).otherwise(f.col("new_struct")))\
        .selectExpr(f"(new_struct IS NULL) as {deletion_field_name}",
                    "output_struct.*")

字符串
该函数采用 Dataframe 的2个版本,并且仅保留修改/删除的行。后连接有大约1.5亿行,后过滤器它真的取决于,可以有50行,可以有50 M行,这取决于当天发生了什么变化。
我设置了"spark.sql.shuffle.partitions": "720"
问题是,在使用该函数后,我正在将数据写入文件,并且它总是尝试写入720个文件(有时根据是否有空分区而写入较少),这是很多,我想使用AQE动态合并功能来减少分区数量,从而动态写入文件的数量。我真的不能合并/重新分区到一个静态的数量,因为它是动态的,有时会有50修改/删除行,有时可能有50 M行。我还尝试了在写之前通过id键重新分区,以强制AQE执行动态合并,它似乎工作正常,但我真的想避免这种解决方案,因为它是一个冗余的 Shuffle
所以基本上我想要的是动态地减少输出文件的数量,如果可能的话,有什么建议吗?
我用的是Spark3.3
谢啦,谢啦

8yparm6h

8yparm6h1#

确保底层配置参数设置为true

  • spark.sql.adaptive.enabled
  • spark.sql.adaptive.coalescePartitions.enabled

在Spark v3.3.0中,默认情况下应该是这样。
另外,请确保将以下配置参数设置为false

  • spark.sql.adaptive.coalescePartitions.parallelismFirst

在Spark v3.3.0中,默认情况下不是
关于spark.sql.adaptive.coalescePartitions.parallelismFirst的文档:
当为true时,Spark在合并连续的shuffle分区时忽略spark.sql.adaptive.advisoryPartitionSizeInBytes指定的目标大小(默认为64MB),并且只考虑spark.sql.adaptive.coalescePartitions.minPartitionSize指定的最小分区大小(默认为1MB),以最大化并行度。这是为了在启用自适应查询执行时避免性能下降。建议将此配置设置为false,并遵守spark.sql.adaptive.advisoryPartitionSizeInBytes指定的目标大小。
因此,基本上,在并行化(尝试使用所有内核)和限制分区大小之间存在权衡。将该参数设置为false将确保Spark将尝试合并分区以达到每个分区64MB。
如果您想更改64MB的值,可以通过更改spark.sql.adaptive.advisoryPartitionSizeInBytes来实现。

xqk2d5yq

xqk2d5yq2#

在您的情况下需要两个设置:spark.sql.adaptive.coalescePartitions.enabledTrue和定义spark.sql.adaptive.coalescePartitions.minPartitionNum:这是允许您设置合并分区的最小数量的配置。它可以设置为优化您的用例的值。
我认为在您的情况下,在写入磁盘之前动态计算最佳min_partitions会很有用。建议的解决方案:

from pyspark.sql import DataFrame
from pyspark.rdd import RDD

def get_optimal_min_partitions(df: DataFrame, target_size_per_partition: int = 128 * 1024 * 1024) -> int:
    """
    Get the optimal minimum number of partitions based on dataframe size.
    :param df: The dataframe
    :param target_size_per_partition: The target size per partition in bytes. Default is 128MB.
    :return: The optimal minimum number of partitions.
    """
    # Get the underlying RDD
    rdd = df.rdd

    # Get the size of the data
    total_size = rdd._jrdd.partitions().size()

    # Calculate the optimal minimum number of partitions
    optimal_min_partitions = int(total_size / target_size_per_partition)
    optimal_min_partitions = max(1, optimal_min_partitions)

    return optimal_min_partitions

字符串
在您根据增量df的大小计算出最佳最小分区后,您可以在将最小分区设置为计算值后写入磁盘。

def calculate_and_write_delta(old_df: DataFrame,
                              new_df: DataFrame,
                              id_keys: List[str],
                              deletion_field_name: str,
                              output_path: str,
                              excluded_fields: List[str] = None) -> None:
    # calculate the delta
    delta_df = calculate_delta(old_df, new_df, id_keys, deletion_field_name, excluded_fields)
    
    # calculate optimal minimum partitions
    optimal_min_partitions = get_optimal_min_partitions(delta_df)
    spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionNum", str(optimal_min_partitions))
    
    # write the data
    delta_df.write.parquet(output_path)

相关问题