如何在Pyspark中使用盐化技术进行偏聚。假设我们有像下面这样的倾斜数据,如何创建salting列并在聚合中使用它。| 城市|状态|计数|| - ------|- ------|- ------|| 拉琼|锡金|三千|| 浪坡|锡金|五万|| 甘托克|锡金|三十万|| 班加罗尔|卡纳塔克邦|两千五百万|| 孟买|马哈拉施特拉邦|二九亿|
oyxsuwqo1#
要对倾斜数据使用salting技术,我们需要创建一个列,例如“salt”。生成一个范围从0到(sparc.sql.shuffle.partitions - 1)的随机no。表应该如下所示,其中“salt”列的值将从0到199(在本例中分区大小为200)。现在您可以对“city”、“state”、“salt”使用groupBy。| 城市|状态|盐|| - ------|- ------|- ------|| 拉琼|锡金|一百五十一|| 拉琼|锡金|一百零二|| 拉琼|锡金|十六|| 浪坡|锡金|五个|| 浪坡|锡金|十九|| 浪坡|锡金|十六|| 浪坡|锡金|一百零二|| 甘托克|锡金|五十五|| 甘托克|锡金|一百一十九|| 甘托克|锡金|十六|| 甘托克|锡金|十个|| 班加罗尔|卡纳塔克邦|十九|| 孟买|马哈拉施特拉邦|无|| 班加罗尔|卡纳塔克邦|一百九十九|| 孟买|马哈拉施特拉邦|一百九十|代码:
from pyspark.sql import SparkSession, functions as f from pyspark.sql.types import ( StructType, StructField, IntegerType ) salval = f.round(f.rand() * int(spark.conf.get("spark.sql.shuffle.partitions")) -1 ) record_df.withColumn("salt", f.lit(salval).cast(IntegerType()))\ .groupBy("city", "state", "salt")\ .agg( f.count("city") )\ .drop("salt")
输出:| 城市|状态|计数|| - ------|- ------|- ------|| 拉琼|锡金|三千|| 浪坡|锡金|五万|| 甘托克|锡金|三十万|| 班加罗尔|卡纳塔克邦|两千五百万|| 孟买|马哈拉施特拉邦|二九亿|
1条答案
按热度按时间oyxsuwqo1#
要对倾斜数据使用salting技术,我们需要创建一个列,例如“salt”。生成一个范围从0到(sparc.sql.shuffle.partitions - 1)的随机no。
表应该如下所示,其中“salt”列的值将从0到199(在本例中分区大小为200)。现在您可以对“city”、“state”、“salt”使用groupBy。
| 城市|状态|盐|
| - ------|- ------|- ------|
| 拉琼|锡金|一百五十一|
| 拉琼|锡金|一百零二|
| 拉琼|锡金|十六|
| 浪坡|锡金|五个|
| 浪坡|锡金|十九|
| 浪坡|锡金|十六|
| 浪坡|锡金|一百零二|
| 甘托克|锡金|五十五|
| 甘托克|锡金|一百一十九|
| 甘托克|锡金|十六|
| 甘托克|锡金|十个|
| 班加罗尔|卡纳塔克邦|十九|
| 孟买|马哈拉施特拉邦|无|
| 班加罗尔|卡纳塔克邦|一百九十九|
| 孟买|马哈拉施特拉邦|一百九十|
代码:
输出:
| 城市|状态|计数|
| - ------|- ------|- ------|
| 拉琼|锡金|三千|
| 浪坡|锡金|五万|
| 甘托克|锡金|三十万|
| 班加罗尔|卡纳塔克邦|两千五百万|
| 孟买|马哈拉施特拉邦|二九亿|