我用as将子查询分隔到每个 Dataframe 中。
from pyspark.sql import DataFrame
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from datetime import datetime, timedelta
ranked_goods_query = """
select ...
"""
all_goods_no_query = """
SELECT ...
"""
all_goods_no_df = read_from_redshift(all_goods_no_query)
ranked_goods_df = read_from_redshift(ranked_goods_query)
given_date = datetime.strptime("2014-06-12", "%Y-%m-%d").date()
today = datetime.today().date()
date_range = [given_date + timedelta(days=x) for x in range((today - given_date).days + 1)]
date_df = spark.createDataFrame([(date,) for date in date_range], ["date"])
filtered_date_df = date_df.select(
F.date_format(F.col("date"), "yyyy-MM").alias("char_date")
).dropDuplicates().orderBy("char_date")
filtered_date_partitioned = filtered_date_df.repartition(2, "char_date")
all_combinations_df = all_goods_no_df.crossJoin(filtered_date_partitioned.hint("broadcast"))
gl_goods_query = """
select ...
"""
gl_goods_df = read_from_redshift(gl_goods_query)
goods_df = read_table_from_redshift("goods").select("goods_no", "DEF_GOODS_NM")
window_spec = (
Window.partitionBy("goods_no")
.orderBy("char_date")
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
)
all_combinations_df_partitioned = F.broadcast(all_combinations_df.repartition(4, "goods_no"))
ranked_goods_partitioned = ranked_goods_df.repartition(8, "goods_no")
joined_df = (
all_combinations_df_partitioned.join(
ranked_goods_partitioned.hint("broadcast"),
["char_date", "goods_no"],
"left_outer",
)
.join(gl_goods_df.hint("broadcast"), "goods_no", "left_outer")
.join(goods_df.hint("broadcast"), "goods_no", "left_outer")
.withColumn(
"CUMULATIVE_QTY",
F.sum(F.coalesce(F.col("sqty"), F.lit(0))).over(window_spec),
)
.select(
"char_date",
"goods_no",
F.when(F.col("goods_nm").isNotNull(), F.col("goods_nm"))
.otherwise(F.col("DEF_GOODS_NM"))
.alias("GOODS_NM"),
"CUMULATIVE_QTY",
)
.orderBy("char_date", "goods_no")
)
到达这里需要1.8秒。
但是...
joined_df.display()
只要运行那一行,就有大约9个spark作业,因此需要20秒才能显示()
我想减少这里显示的Spark工作()并提高性能
在此输入图像描述
看着DAG Visualization和Summer,我怀疑是这个部分,但我不知道该怎么办。
1条答案
按热度按时间yuvru6vn1#
Spark使用Lazy Evaluation。在触发操作或转换之前,不会计算任何内容。
也就是说,当你调用
joined_df.display()
时,spark才会真正开始做任何工作。之前的所有代码加起来都是Spark Query Plan
,速度非常快。只有当display()
被调用时,您的查询计划才会实现并付诸行动。仅供参考:这里是another answer阐述广播变量也是懒惰的