pyspark数据偏斜与窗口函数

mzillmmw  于 2021-05-17  发布在  Spark
关注(0)|答案(2)|浏览(622)

我有一个巨大的pysparkDataframe,我在我的键定义的分区上执行一系列窗口函数。
这个键的问题是,我的分区会因此而扭曲,导致事件时间线看起来像这样,

我知道我可以使用盐渍技术来解决这个问题,当我做一个连接。但是当我使用窗口函数时,如何解决这个问题呢?
我在窗口函数中使用了lag,lead等函数。我不能用咸钥匙做这个过程,因为我会得到错误的结果。
如何解决这种情况下的偏斜?
我正在寻找一种动态的方式来重新划分我的Dataframe,而不会出现倾斜。
根据@jxc的回答更新
我试着创建一个示例df并试着在上面运行代码,

df = pd.DataFrame()
df['id'] = np.random.randint(1, 1000, size=150000)
df['id'] = df['id'].map(lambda x: 100 if x % 2 == 0 else x)
df['timestamp'] = pd.date_range(start=pd.Timestamp('2020-01-01'), periods=len(df), freq='60s')
sdf = sc.createDataFrame(df)
sdf = sdf.withColumn("amt", F.rand()*100)
w = Window.partitionBy("id").orderBy("timestamp")

sdf = sdf.withColumn("new_col", F.lag("amt").over(w) + F.lead("amt").over(w))
x = sdf.toPandas()

这给了我一个这样的事件时间表,

我试过@jxc答案中的代码,

sdf = sc.createDataFrame(df)
sdf = sdf.withColumn("amt", F.rand()*100)

N = 24*3600*365*2
sdf_1 = sdf.withColumn('pid', F.ceil(F.unix_timestamp('timestamp')/N))

w1 = Window.partitionBy('id', 'pid').orderBy('timestamp')
w2 = Window.partitionBy('id', 'pid')

sdf_2 = sdf_1.select(
    '*',
    F.count('*').over(w2).alias('cnt'),
    F.row_number().over(w1).alias('rn'),
    (F.lag('amt',1).over(w1) + F.lead('amt',1).over(w1)).alias('new_val')
)

sdf_3 = sdf_2.filter('rn in (1, 2, cnt-1, cnt)') \
    .withColumn('new_val', F.lag('amt',1).over(w) + F.lead('amt',1).over(w)) \
    .filter('rn in (1,cnt)')

df_new = sdf_2.filter('rn not in (1,cnt)').union(sdf_3)

x = df_new.toPandas()

最后我又多了一个阶段,活动时间线看起来更偏了,

另外,使用新代码,运行时间也增加了一点

neekobn8

neekobn81#

要处理一个大分区,可以尝试根据orderby列(很可能是数字列或可以转换为数字的日期/时间戳列)将其拆分,以便所有新的子分区都保持正确的行顺序。使用新的分区器处理行,并使用 lag 以及 lead 函数中,只有子分区之间边界周围的行需要进行后处理(下面还讨论了如何在task-2中合并小分区
用你的例子 sdf 假设我们有以下winspec和一个简单的聚合函数:

w = Window.partitionBy('id').orderBy('timestamp')
df.withColumn('new_amt', F.lag('amt',1).over(w) + F.lead('amt',1).over(w))

任务1:拆分大分区:

请尝试以下操作:
选择一个n来分割时间戳,并设置一个额外的partitionby column pid(使用 ceil , int , floor 等等):


# N to cover 35-days' intervals

N = 24*3600*35
df1 = sdf.withColumn('pid', F.ceil(F.unix_timestamp('timestamp')/N))

将pid添加到partitionby中(参见w1),然后添加calaulte row_number() , lag() 以及 lead() w1以上。还要查找每个新分区中的行数(cnt),以帮助标识分区的结尾( rn == cnt ). 除了每个分区边界上的行以外,大多数行的新值都很好。

w1 = Window.partitionBy('id', 'pid').orderBy('timestamp')
w2 = Window.partitionBy('id', 'pid')

df2 = df1.select(
    '*',
    F.count('*').over(w2).alias('cnt'),
    F.row_number().over(w1).alias('rn'),
    (F.lag('amt',1).over(w1) + F.lead('amt',1).over(w1)).alias('new_amt')
)

下面是一个例子 df2 显示边界行。

处理边界:选择边界上的行 rn in (1, cnt) 加上计算中使用的值 rn in (2, cnt-1) ,对w上的新值执行相同的计算,并仅保存边界行的结果。

df3 = df2.filter('rn in (1, 2, cnt-1, cnt)') \
    .withColumn('new_amt', F.lag('amt',1).over(w) + F.lead('amt',1).over(w)) \
    .filter('rn in (1,cnt)')

下面显示了从上述df2得到的df3

将df3合并回df2以更新边界行 rn in (1,cnt) ```
df_new = df2.filter('rn not in (1,cnt)').union(df3)

下面的屏幕截图显示了边界行周围的最终df\u new:
![](https://i.stack.imgur.com/zbTjz.jpg)

drop columns which are used to implement logic only

df_new = df_new.drop('cnt', 'rn')

注意事项:
定义了以下3个窗口规范:

w = Window.partitionBy('id').orderBy('timestamp') <-- fix boundary rows
w1 = Window.partitionBy('id', 'pid').orderBy('timestamp') <-- calculate internal rows
w2 = Window.partitionBy('id', 'pid') <-- find #rows in a partition

注意:严格来说,我们最好使用以下方法 `w` 修复边界行以避免绑定的问题 `timestamp` 在边界附近。

w = Window.partitionBy('id').orderBy('pid', 'rn') <-- fix boundary rows

如果您知道哪些分区是倾斜的,只需将它们分开并跳过其他分区即可。现有的方法可能会将一个小分区拆分为2个或更多个分区,如果它们是稀疏分布的

df1 = df.withColumn('pid', F.when(F.col('id').isin('a','b'), F.ceil(F.unix_timestamp('timestamp')/N)).otherwise(1))

对于每个分区,可以检索 `count` (行数)和 `min_ts` =min(时间戳),然后为 `pid` (下图) `M` 是要拆分的行数的阈值):

F.expr(f"IF(count>{M}, ceil((unix_timestamp(timestamp)-unix_timestamp(min_ts))/{N}), 1)")

注意:对于分区内的偏斜,will需要更复杂的函数来生成 `pid` .
要是…就好了 `lag(1)` 函数使用,只需后置处理左边界,过滤方式 `rn in (1, cnt)` 并且只更新 `rn == 1` ```
df3 = df1.filter('rn in (1, cnt)') \
    .withColumn('new_amt', F.lag('amt',1).over(w)) \
    .filter('rn = 1')

类似于领导职能时,我们只需要修复正确的边界和更新 rn == cnt 要是…就好了 lag(2) ,然后使用 df3 :

df3 = df1.filter('rn in (1, 2, cnt-1, cnt)') \
    .withColumn('new_amt', F.lag('amt',2).over(w)) \
    .filter('rn in (1,2)')

您可以将相同的方法扩展到两种情况的混合情况 lag 以及 lead 具有不同的偏移量。

任务2:合并小分区:

基于分区中的记录数 count ,我们可以设置一个阈值 M 所以如果 count>M ,则id保留自己的分区,否则我们将合并分区,以便总记录的#小于 M (以下方法有一个 2*M-2 ).

M = 20000

# create pandas df with columns `id`, `count` and `f`, sort rows so that rows with count>=M are located on top

d2 = pd.DataFrame([ e.asDict() for e in sdf.groupby('id').count().collect() ]) \
    .assign(f=lambda x: x['count'].lt(M)) \
    .sort_values('f')    

# add pid column to merge smaller partitions but the total row-count in partition should be less than or around M

# potentially there could be at most `2*M-2` records for the same pid, to make sure strictly count<M, use a for-loop to iterate d1 and set pid:

d2['pid'] = (d2.mask(d2['count'].gt(M),M)['count'].shift(fill_value=0).cumsum()/M).astype(int)

# add pid to sdf. In case join is too heavy, try using Map

sdf_1 = sdf.join(spark.createDataFrame(d2).alias('d2'), ["id"]) \
    .select(sdf["*"], F.col("d2.pid"))

# check pid: # of records and # of distinct ids

sdf_1.groupby('pid').agg(F.count('*').alias('count'), F.countDistinct('id').alias('cnt_ids')).orderBy('pid').show()
+---+-----+-------+                                                             
|pid|count|cnt_ids|
+---+-----+-------+
|  0|74837|      1|
|  1|20036|    133|
|  2|20052|    134|
|  3|20010|    133|
|  4|15065|    100|
+---+-----+-------+

现在,新窗口应该单独按pid进行分区,并将id移到 orderBy ,见下表:

w3 = Window.partitionBy('pid').orderBy('id','timestamp')

基于上述w3 winspec自定义滞后/超前函数,然后计算新的值:

lag_w3  = lambda col,n=1: F.when(F.lag('id',n).over(w3) == F.col('id'), F.lag(col,n).over(w3))
lead_w3 = lambda col,n=1: F.when(F.lead('id',n).over(w3) == F.col('id'), F.lead(col,n).over(w3))

sdf_new = sdf_1.withColumn('new_val', lag_w3('amt',1) + lead_w3('amt',1))
pn9klfpd

pn9klfpd2#

要处理这种扭曲的数据,有几种方法可以尝试。
如果您使用databricks来运行作业,并且您知道哪列将有倾斜,那么您可以尝试一个名为skew hint的选项
我建议您使用spark 3.0,因为您可以选择使用自适应查询执行(aqe),它可以处理大多数问题,从而改善您的作业运行状况,并可能更快地运行它们。
通常,我建议在进行任何大范围操作之前使数据分区的大小更均匀,增加集群大小确实有帮助,但我不确定这是否对您有效。

相关问题