pyspark多级聚合数据自定义框架;我怎样才能正确地概括这一点呢

zzlelutf  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(201)

我知道有几种方法可以对sparkDataframe列的聚合进行计算。太多了。。。现在还不清楚哪一个是“最好的”,或者怎样才能像我所希望的那样恰当地概括它们。我看到一些stackoverflow解决方案与spark/databricks本身的一个或另一个建议相冲突,但没有一个能完全满足我的要求。
下面是一段小代码,显示了我正在尝试做什么(使用某种任意计算的多列聚合),以及工作方式,但我无法智能地比较或评估:

数据集

df = sc.parallelize([
    [100, 'A', 1],
    [100, 'A', 2],
    [101, 'A', 3],
    [102, 'B', 4],
    [103, 'B', 5],
    [102, 'B', 6],
]).toDF(('id', 'grp', 'val'))

使用applyin

def calc_q1(pddf):
  newcol = pddf.val.quantile(q=0.25)
  return pddf.assign(val_Q1 = newcol)

grp_df = df.groupBy('grp', 'id').applyInPandas(calc_q1, schema="grp string, id long, val long, val_Q1 double")

由此产生:

赞成的意见:
所有功能现在都可用
任意复杂性/功能
这似乎是spark/databricks最推荐的方法
欺骗:
似乎需要硬编码(即,我必须指定 valcalc_q1 函数)。
我无法返回多个值(在后面的示例中,您将看到返回的结果列表)。
必须提供输出模式是一件痛苦的事情,而且还需要一些硬编码。
每个聚合(传递给函数的每个Dataframe)必须适合内存,因为正在调用一个函数
这不是我工作中的问题,但我很好奇:我认为这是我列出的所有选项的限制,而不仅仅是Pandas选项。

使用窗口

from pyspark.sql import Window
import pyspark.sql.functions as F

grp_window = Window.partitionBy('id', 'grp')
quantiles = F.expr('percentile_approx(val, array(0.25, 0.5, 0.75))')

grp_df = df.withColumn('med_val', quantiles.over(grp_window))

由此产生:

赞成的意见:
使用 functions.expr 似乎和依赖所有的Pandas一样“开放”和功利。
欺骗:
这有点费劲
可能因为这个也慢了??我没有比较过时间。
使用 Window 以某种方式执行聚合只是感觉“不对”。
我仍然不能执行任何我想要的表达式,只是任何pyspark表达式。

使用groupby

import pyspark.sql.functions as F

quantiles = F.expr('percentile_approx(val, array(0.25, 0.5, 0.75))')

grp_df = df.groupBy('grp', 'id').agg(quantiles.alias('med_val'))

由此产生:

赞成的意见:
这就是我已经用于更简单计算(不需要自定义项或多级聚合的计算)的方法。
它“感觉”正确,就像我用的动词一样 groupBy 以及 agg 与我正在做的事情的概念和语义完全一致。
欺骗:
我不知道,但既然这是一种“较老”的做事方式,我觉得一定有理由做出更新的方法。
我仍然不能执行任何我想要的表达式,只是任何pyspark表达式。

我想要达到的目标

我想找到一种方法把参数传递给聚合函数,这样它就可以被推广了。但我不知道怎么做。
我希望能够做到这样:

def calc_quants(pddf, col, quantcol, quants):
  pddf[quantcol] = pddf[col].quantile(q=quants) # Where `quants` is a list of values
  return pddf

grp_df = (df.groupBy('grp', 'id')
            .applyInPandas(calc_quants, val_quants, [0.25, 0.75],
                           schema="grp string, id long, val long, val_quants list"))

能够像我上面写的那样做是我最终的愿望。我列出了我发现的所有方法,因为它们似乎都不能做我想做的事情。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题