我试图在spark中计算一些移动平均值,但遇到了分区倾斜的问题。下面是我尝试执行的简单计算:
获取基础数据
# Variables
one_min = 60
one_hour = 60*one_min
one_day = 24*one_hour
seven_days = 7*one_day
thirty_days = 30*one_day
# Column variables
target_col = "target"
partition_col = "partition_col"
df_base = (
spark
.sql("SELECT * FROM {base}".format(base=base_table))
)
df_product1 = (
df_base
.where(F.col("product_id") == F.lit("1"))
.select(
F.col(target_col).astype("double").alias(target_col),
F.unix_timestamp("txn_timestamp").alias("window_time"),
"transaction_id",
partition_col
)
)
df_product1.persist()
计算运行平均值
window_lengths = {
"1day": one_day,
"7day": seven_days,
"30day": thirty_days
}
# Create window specs for each type
part_windows = {
time: Window.partitionBy(F.col(partition_col))
.orderBy(F.col("window_time").asc())
.rangeBetween(-secs, -one_min)
for (time, secs) in window_lengths.items()
}
cols = [
# Note: not using `avg` as I will be smoothing this at some point
(F.sum(target_col).over(win)/F.count("*").over(win)).alias(
"{time}_avg_target".format(time=time)
)
for time, win in part_windows.items()
]
sample_df = (
df_product1
.repartition(2000, partition_col)
.sortWithinPartitions(F.col("window_time").asc())
.select(
"*",
*cols
)
)
现在,我可以了 collect
这些数据的有限子集(比如说只有100行),但是如果我尝试运行完整的查询,例如,聚合运行的平均值,spark就会在一些特别大的分区上卡住。绝大多数分区中只有不到100万条记录。其中只有大约50人有超过100万的记录,只有大约150人有超过50万的记录
然而,有一小部分人的记录超过了250万份(约10份),其中3份记录超过了500万份。这些分区已运行超过12小时,但未能完成。这些分区中的倾斜是数据的自然部分,表示分区列的这些不同值中的较大活动。我无法控制这个分区列的值的定义。
我用的是 SparkSession
在启用动态分配的情况下,每个执行器有32g ram和4个内核,并且至少有4个执行器。我曾试图将执行器增加到96g,每个执行器8个内核,最少10个执行器,但工作仍然没有完成。
这看起来像一个不需要13小时就能完成的计算。这个 df_product1
Dataframe只包含不到3亿条记录。
如果有其他信息有助于解决此问题,请在下面进行评论。
暂无答案!
目前还没有任何答案,快来回答吧!