我知道有几种方法可以对sparkDataframe列的聚合进行计算。太多了。。。现在还不清楚哪一个是“最好的”,或者怎样才能像我所希望的那样恰当地概括它们。我看到一些stackoverflow解决方案与spark/databricks本身的一个或另一个建议相冲突,但没有一个能完全满足我的要求。
下面是一段小代码,显示了我正在尝试做什么(使用某种任意计算的多列聚合),以及工作方式,但我无法智能地比较或评估:
数据集
df = sc.parallelize([
[100, 'A', 1],
[100, 'A', 2],
[101, 'A', 3],
[102, 'B', 4],
[103, 'B', 5],
[102, 'B', 6],
]).toDF(('id', 'grp', 'val'))
使用applyin
def calc_q1(pddf):
newcol = pddf.val.quantile(q=0.25)
return pddf.assign(val_Q1 = newcol)
grp_df = df.groupBy('grp', 'id').applyInPandas(calc_q1, schema="grp string, id long, val long, val_Q1 double")
由此产生:
赞成的意见:
所有功能现在都可用
任意复杂性/功能
这似乎是spark/databricks最推荐的方法
欺骗:
似乎需要硬编码(即,我必须指定 val
在 calc_q1
函数)。
我无法返回多个值(在后面的示例中,您将看到返回的结果列表)。
必须提供输出模式是一件痛苦的事情,而且还需要一些硬编码。
每个聚合(传递给函数的每个Dataframe)必须适合内存,因为正在调用一个函数
这不是我工作中的问题,但我很好奇:我认为这是我列出的所有选项的限制,而不仅仅是Pandas选项。
使用窗口
from pyspark.sql import Window
import pyspark.sql.functions as F
grp_window = Window.partitionBy('id', 'grp')
quantiles = F.expr('percentile_approx(val, array(0.25, 0.5, 0.75))')
grp_df = df.withColumn('med_val', quantiles.over(grp_window))
由此产生:
赞成的意见:
使用 functions.expr
似乎和依赖所有的Pandas一样“开放”和功利。
欺骗:
这有点费劲
可能因为这个也慢了??我没有比较过时间。
使用 Window
以某种方式执行聚合只是感觉“不对”。
我仍然不能执行任何我想要的表达式,只是任何pyspark表达式。
使用groupby
import pyspark.sql.functions as F
quantiles = F.expr('percentile_approx(val, array(0.25, 0.5, 0.75))')
grp_df = df.groupBy('grp', 'id').agg(quantiles.alias('med_val'))
由此产生:
赞成的意见:
这就是我已经用于更简单计算(不需要自定义项或多级聚合的计算)的方法。
它“感觉”正确,就像我用的动词一样 groupBy
以及 agg
与我正在做的事情的概念和语义完全一致。
欺骗:
我不知道,但既然这是一种“较老”的做事方式,我觉得一定有理由做出更新的方法。
我仍然不能执行任何我想要的表达式,只是任何pyspark表达式。
我想要达到的目标
我想找到一种方法把参数传递给聚合函数,这样它就可以被推广了。但我不知道怎么做。
我希望能够做到这样:
def calc_quants(pddf, col, quantcol, quants):
pddf[quantcol] = pddf[col].quantile(q=quants) # Where `quants` is a list of values
return pddf
grp_df = (df.groupBy('grp', 'id')
.applyInPandas(calc_quants, val_quants, [0.25, 0.75],
schema="grp string, id long, val long, val_quants list"))
能够像我上面写的那样做是我最终的愿望。我列出了我发现的所有方法,因为它们似乎都不能做我想做的事情。
暂无答案!
目前还没有任何答案,快来回答吧!