我有一个数据集,我正试图在pyspark中处理。数据(在磁盘上作为parquet)包含用户id、会话id和与每个会话相关的元数据。我正在向我的Dataframe添加一些列,这些列是通过窗口聚合的结果。我遇到的问题是,除了4-6个执行者之外,所有的执行者都会很快完成,其余的执行者永远不会完成。我的代码如下所示:
import pyspark.sql.functions as f
from pyspark.sql.window import Window
empty_col_a_cond = ((f.col("col_A").isNull()) |
(f.col("col_A") == ""))
session_window = Window.partitionBy("user_id", "session_id") \
.orderBy(f.col("step_id").asc())
output_df = (
input_df
.withColumn("col_A_val", f
.when(empty_col_a_cond, f.lit("NA"))
.otherwise(f.col("col_A")))
# ... 10 more added columns replacing nulls/empty strings
.repartition("user_id", "session_id")
.withColumn("s_user_id", f.first("user_id", True).over(session_window))
.withColumn("s_col_B", f.collect_list("col_B").over(session_window))
.withColumn("s_col_C", f.min("col_C").over(session_window))
.withColumn("s_col_D", f.max("col_D").over(session_window))
# ... 16 more added columns aggregating over session_window
.where(f.col("session_flag") == 1)
.where(f.array_contains(f.col("s_col_B"), "some_val"))
)
在我的日志中,我一遍又一遍地看到:
INFO ExternalAppendOnlyUnsafeRowArray: Reached spill threshold of 4096 rows, switching to org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
INFO UnsafeExternalSorter: Thread 92 spilling sort data of 9.2 GB to disk (2 times so far)
INFO UnsafeExternalSorter: Thread 91 spilling sort data of 19.3 GB to disk (0 time so far)
这表明spark不能在内存中保存所有的窗口数据。我试着增加内部设置 spark.sql.windowExec.buffer.in.memory.threshold
以及 spark.sql.windowExec.buffer.spill.threshold
,这有点帮助,但仍然有执行人没有完成。
我相信这都是由于数据中的一些偏差造成的。按两者分组 user_id
以及 session_id
,有5个条目的计数>=10000,100条记录的计数介于1000和10000之间,150000条记录的计数小于1000(通常计数=1)。
input_df \
.groupBy(f.col("user_id"), f.col("session_id")) \
.count() \
.filter("count < 1000") \
.count()
# >= 10k, 6
# < 10k and >= 1k, 108
# < 1k, 150k
这是生成的作业dag:
暂无答案!
目前还没有任何答案,快来回答吧!