我有以下功能:
def calculate_delta(old_df: DataFrame,
new_df: DataFrame,
id_keys: List[str],
deletion_field_name: str,
excluded_fields: List[str] = None) -> DataFrame:
# make sure both the new_df and old_df have the same columns order
if excluded_fields:
old_df = old_df.drop(*excluded_fields)
shared_columns = set(old_df.columns).intersection(new_df.columns)
new_df = new_df.select(*id_keys, f.struct(*shared_columns).alias("new_struct"))
old_df = old_df.select(*id_keys, f.struct(*shared_columns).alias("old_struct"))
return new_df.join(old_df, on=id_keys, how="outer") \
.filter("old_struct IS NULL OR new_struct IS NULL OR new_struct != old_struct") \
.withColumn(deletion_field_name, f.expr("new_struct is null"))\
.withColumn("output_struct", f.when(f.col("new_struct").isNull(), f.col("old_struct")).otherwise(f.col("new_struct")))\
.selectExpr(f"(new_struct IS NULL) as {deletion_field_name}",
"output_struct.*")
字符串
该函数采用 Dataframe 的2个版本,并且仅保留修改/删除的行。后连接有大约1.5亿行,后过滤器它真的取决于,可以有50行,可以有50 M行,这取决于当天发生了什么变化。
我设置了"spark.sql.shuffle.partitions": "720"
。
问题是,在使用该函数后,我正在将数据写入文件,并且它总是尝试写入720个文件(有时根据是否有空分区而写入较少),这是很多,我想使用AQE动态合并功能来减少分区数量,从而动态写入文件的数量。我真的不能合并/重新分区到一个静态的数量,因为它是动态的,有时会有50修改/删除行,有时可能有50 M行。我还尝试了在写之前通过id键重新分区,以强制AQE执行动态合并,它似乎工作正常,但我真的想避免这种解决方案,因为它是一个冗余的 Shuffle
所以基本上我想要的是动态地减少输出文件的数量,如果可能的话,有什么建议吗?
我用的是Spark3.3
谢啦,谢啦
2条答案
按热度按时间8yparm6h1#
确保底层配置参数设置为
true
:spark.sql.adaptive.enabled
个spark.sql.adaptive.coalescePartitions.enabled
个在Spark v3.3.0中,默认情况下应该是这样。
另外,请确保将以下配置参数设置为
false
:spark.sql.adaptive.coalescePartitions.parallelismFirst
个在Spark v3.3.0中,默认情况下不是。
关于
spark.sql.adaptive.coalescePartitions.parallelismFirst
的文档:当为true时,Spark在合并连续的shuffle分区时忽略spark.sql.adaptive.advisoryPartitionSizeInBytes指定的目标大小(默认为64MB),并且只考虑spark.sql.adaptive.coalescePartitions.minPartitionSize指定的最小分区大小(默认为1MB),以最大化并行度。这是为了在启用自适应查询执行时避免性能下降。建议将此配置设置为false,并遵守spark.sql.adaptive.advisoryPartitionSizeInBytes指定的目标大小。
因此,基本上,在并行化(尝试使用所有内核)和限制分区大小之间存在权衡。将该参数设置为
false
将确保Spark将尝试合并分区以达到每个分区64MB。如果您想更改64MB的值,可以通过更改
spark.sql.adaptive.advisoryPartitionSizeInBytes
来实现。xqk2d5yq2#
在您的情况下需要两个设置:
spark.sql.adaptive.coalescePartitions.enabled
到True
和定义spark.sql.adaptive.coalescePartitions.minPartitionNum
:这是允许您设置合并分区的最小数量的配置。它可以设置为优化您的用例的值。我认为在您的情况下,在写入磁盘之前动态计算最佳min_partitions会很有用。建议的解决方案:
字符串
在您根据增量df的大小计算出最佳最小分区后,您可以在将最小分区设置为计算值后写入磁盘。
型