pyspark 如何提高display()在数据砖中的性能?

gk7wooem  于 2023-10-15  发布在  Spark
关注(0)|答案(1)|浏览(113)

我用as将子查询分隔到每个 Dataframe 中。

from pyspark.sql import DataFrame
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from datetime import datetime, timedelta

ranked_goods_query = """
select  ...
  """

all_goods_no_query = """
SELECT ...
"""

all_goods_no_df = read_from_redshift(all_goods_no_query)

ranked_goods_df = read_from_redshift(ranked_goods_query)

given_date = datetime.strptime("2014-06-12", "%Y-%m-%d").date()
today = datetime.today().date()

date_range = [given_date + timedelta(days=x) for x in range((today - given_date).days + 1)]
date_df = spark.createDataFrame([(date,) for date in date_range], ["date"])

filtered_date_df = date_df.select(
    F.date_format(F.col("date"), "yyyy-MM").alias("char_date")
).dropDuplicates().orderBy("char_date")

filtered_date_partitioned = filtered_date_df.repartition(2, "char_date")

all_combinations_df = all_goods_no_df.crossJoin(filtered_date_partitioned.hint("broadcast"))

gl_goods_query = """
select ...
"""

gl_goods_df = read_from_redshift(gl_goods_query)

goods_df = read_table_from_redshift("goods").select("goods_no", "DEF_GOODS_NM")

window_spec = (
    Window.partitionBy("goods_no")
    .orderBy("char_date")
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)
)

all_combinations_df_partitioned = F.broadcast(all_combinations_df.repartition(4, "goods_no"))
ranked_goods_partitioned = ranked_goods_df.repartition(8, "goods_no")

joined_df = (
    all_combinations_df_partitioned.join(
        ranked_goods_partitioned.hint("broadcast"),
        ["char_date", "goods_no"],
        "left_outer",
    )
    .join(gl_goods_df.hint("broadcast"), "goods_no", "left_outer")
    .join(goods_df.hint("broadcast"), "goods_no", "left_outer")
    .withColumn(
        "CUMULATIVE_QTY",
        F.sum(F.coalesce(F.col("sqty"), F.lit(0))).over(window_spec),
    )
    .select(
        "char_date",
        "goods_no",
        F.when(F.col("goods_nm").isNotNull(), F.col("goods_nm"))
        .otherwise(F.col("DEF_GOODS_NM"))
        .alias("GOODS_NM"),
        "CUMULATIVE_QTY",
    )
    .orderBy("char_date", "goods_no")
)

到达这里需要1.8秒。
但是...

joined_df.display()

只要运行那一行,就有大约9个spark作业,因此需要20秒才能显示()
我想减少这里显示的Spark工作()并提高性能
在此输入图像描述
看着DAG Visualization和Summer,我怀疑是这个部分,但我不知道该怎么办。

yuvru6vn

yuvru6vn1#

Spark使用Lazy Evaluation。在触发操作或转换之前,不会计算任何内容。
也就是说,当你调用joined_df.display()时,spark才会真正开始做任何工作。之前的所有代码加起来都是Spark Query Plan,速度非常快。只有当display()被调用时,您的查询计划才会实现并付诸行动。
仅供参考:这里是another answer阐述广播变量也是懒惰的

相关问题