使用泛型返回类型创建flinksql udf

wb1gzix0  于 2021-06-26  发布在  Flink
关注(0)|答案(1)|浏览(618)

我想定义函数 MAX_BY 它接受类型的值 T 和类型的排序参数 Number 并根据排序(类型)从窗口返回max元素 T ). 我试过了

public class MaxBy<T> extends AggregateFunction<T, Tuple2<T, Number>> {

    @Override
    public T getValue(Tuple2<T, Number> tuple) {
        return tuple.f0;
    }

    @Override
    public Tuple2<T, Number> createAccumulator() {
        return Tuple2.of(null, 0L);
    }

    public void accumulate(Tuple2<T, Number> acc, T value, Number order) {
        if (order.doubleValue() > acc.f1.doubleValue()) {
            acc.f0 = value;
            acc.f1 = order;
        }
    }
}

但是我不能使用 TableEnvironment.registerFunction . Flink使用的 TypeInformation 为了匹配sql查询中的类型,并且使用这样的定义,它无法确定类型(至少我是这么想的)。我看到有可能提供几个 accumulate 但我认为每个重载方法的返回类型必须相同。
内置聚合函数的工作原理与我想要实现的类似- MAX 可以采用任意列类型并返回相同的类型。这就是为什么我想我也能做到的原因。

kuuvgm7e

kuuvgm7e1#

不幸的是,flink不支持具有灵活返回类型的聚合函数。对于 MAX 函数,内部实现定义独立于类型的核心逻辑,然后为每个支持的类型创建一个实现(参见代码)。
在内部, MAX 然后根据类型Map到正确的实现。
如果您将函数定义并注册为用户定义的聚合函数,我认为这是不可能的。

相关问题