spark 3.0.1是否支持窗口函数上的自定义聚合器?

zfciruhq  于 2021-05-16  发布在  Spark
关注(0)|答案(1)|浏览(506)

我写了一个习惯 Aggregator (a)扩展 org.apache.spark.sql.expressions.Aggregator )spark将其作为 group by 声明:

sparkSession
    .createDataFrame(...)
    .groupBy(col("id"))
    .agg(
        new MyCustomAggregator().toColumn().name("aggregation_result"))
    .show();

我想在window函数中使用它,因为排序对我很重要。我试着这样调用它:

sparkSession
    .createDataFrame(...)
    .withColumn("aggregation_result", new MyCustomAggregator().toColumn().over(Window
        .partitionBy(col("id"))
        .orderBy(col("order"))))
    .show();

这就是我得到的错误:

org.apache.spark.sql.AnalysisException: cannot resolve '(PARTITION BY `id` ORDER BY `order` ASC NULLS FIRST unspecifiedframe$())' due to data type mismatch: Cannot use an UnspecifiedFrame. This should have been converted during analysis. Please file a bug report.

在spark 3.0.1中是否可以使用自定义聚合器作为窗口函数?如果是的话,我还缺什么?

kjthegm6

kjthegm61#

是的,spark 3确实支持自定义聚合器作为窗口函数。
以下是java代码:

UserDefinedFunction myCustomAggregation = functions.udaf(new MyCustomAggregator(), Encoders.bean(AggregationInput.class));

sparkSession
    .createDataFrame(...)
    .withColumn("aggregation_result", myCustomAggregation.apply(col("aggregation_input1"), col("aggregation_input2")).over(Window
        .partitionBy(col("id"))
        .orderBy(col("order"))))
    .show();
``` `AggregationInput` 下面是一个简单的dto,其中包含聚合函数所需的行元素。
所以不管你是否 `group by` 或者作为你仍然想要使用的窗口函数 `org.apache.spark.sql.expressions.Aggregator` .

相关问题