Pyspark:如何使用倾斜骨料的盐渍技术

kgqe7b3p  于 2023-02-21  发布在  Spark
关注(0)|答案(1)|浏览(162)

如何在Pyspark中使用盐化技术进行偏聚。
假设我们有像下面这样的倾斜数据,如何创建salting列并在聚合中使用它。
| 城市|状态|计数|
| - ------|- ------|- ------|
| 拉琼|锡金|三千|
| 浪坡|锡金|五万|
| 甘托克|锡金|三十万|
| 班加罗尔|卡纳塔克邦|两千五百万|
| 孟买|马哈拉施特拉邦|二九亿|

oyxsuwqo

oyxsuwqo1#

要对倾斜数据使用salting技术,我们需要创建一个列,例如“salt”。生成一个范围从0到(sparc.sql.shuffle.partitions - 1)的随机no。
表应该如下所示,其中“salt”列的值将从0到199(在本例中分区大小为200)。现在您可以对“city”、“state”、“salt”使用groupBy。
| 城市|状态|盐|
| - ------|- ------|- ------|
| 拉琼|锡金|一百五十一|
| 拉琼|锡金|一百零二|
| 拉琼|锡金|十六|
| 浪坡|锡金|五个|
| 浪坡|锡金|十九|
| 浪坡|锡金|十六|
| 浪坡|锡金|一百零二|
| 甘托克|锡金|五十五|
| 甘托克|锡金|一百一十九|
| 甘托克|锡金|十六|
| 甘托克|锡金|十个|
| 班加罗尔|卡纳塔克邦|十九|
| 孟买|马哈拉施特拉邦|无|
| 班加罗尔|卡纳塔克邦|一百九十九|
| 孟买|马哈拉施特拉邦|一百九十|
代码:

from pyspark.sql import SparkSession, functions as f
from pyspark.sql.types import (
    StructType, StructField, IntegerType
)

salval = f.round(f.rand() * int(spark.conf.get("spark.sql.shuffle.partitions")) -1 )

record_df.withColumn("salt", f.lit(salval).cast(IntegerType()))\
    .groupBy("city", "state", "salt")\
    .agg(
      f.count("city")
    )\
    .drop("salt")

输出:
| 城市|状态|计数|
| - ------|- ------|- ------|
| 拉琼|锡金|三千|
| 浪坡|锡金|五万|
| 甘托克|锡金|三十万|
| 班加罗尔|卡纳塔克邦|两千五百万|
| 孟买|马哈拉施特拉邦|二九亿|

相关问题