在pyspark窗口函数中解决分区倾斜的性能问题

kq0g1dla  于 2021-05-29  发布在  Spark
关注(0)|答案(0)|浏览(390)

我试图在spark中计算一些移动平均值,但遇到了分区倾斜的问题。下面是我尝试执行的简单计算:

获取基础数据


# Variables

one_min = 60
one_hour = 60*one_min
one_day = 24*one_hour
seven_days = 7*one_day
thirty_days = 30*one_day

# Column variables

target_col = "target"
partition_col = "partition_col"

df_base = (
    spark
    .sql("SELECT * FROM {base}".format(base=base_table))
)

df_product1 = (
    df_base
    .where(F.col("product_id") == F.lit("1"))
    .select(
        F.col(target_col).astype("double").alias(target_col),
        F.unix_timestamp("txn_timestamp").alias("window_time"),
        "transaction_id",
        partition_col
    )
)
df_product1.persist()

计算运行平均值

window_lengths = {
    "1day": one_day,
    "7day": seven_days,
    "30day": thirty_days
}

# Create window specs for each type

part_windows = {
    time: Window.partitionBy(F.col(partition_col))
                .orderBy(F.col("window_time").asc())
                .rangeBetween(-secs, -one_min)
    for (time, secs) in window_lengths.items()
}

cols = [
    # Note: not using `avg` as I will be smoothing this at some point
    (F.sum(target_col).over(win)/F.count("*").over(win)).alias(
        "{time}_avg_target".format(time=time)
    )
    for time, win in part_windows.items()
]

sample_df = (
    df_product1
    .repartition(2000, partition_col)
    .sortWithinPartitions(F.col("window_time").asc())
    .select(
        "*",
        *cols
    )
)

现在,我可以了 collect 这些数据的有限子集(比如说只有100行),但是如果我尝试运行完整的查询,例如,聚合运行的平均值,spark就会在一些特别大的分区上卡住。绝大多数分区中只有不到100万条记录。其中只有大约50人有超过100万的记录,只有大约150人有超过50万的记录
然而,有一小部分人的记录超过了250万份(约10份),其中3份记录超过了500万份。这些分区已运行超过12小时,但未能完成。这些分区中的倾斜是数据的自然部分,表示分区列的这些不同值中的较大活动。我无法控制这个分区列的值的定义。
我用的是 SparkSession 在启用动态分配的情况下,每个执行器有32g ram和4个内核,并且至少有4个执行器。我曾试图将执行器增加到96g,每个执行器8个内核,最少10个执行器,但工作仍然没有完成。
这看起来像一个不需要13小时就能完成的计算。这个 df_product1 Dataframe只包含不到3亿条记录。
如果有其他信息有助于解决此问题,请在下面进行评论。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题