我想定义函数 MAX_BY
它接受类型的值 T
和类型的排序参数 Number
并根据排序(类型)从窗口返回max元素 T
). 我试过了
public class MaxBy<T> extends AggregateFunction<T, Tuple2<T, Number>> {
@Override
public T getValue(Tuple2<T, Number> tuple) {
return tuple.f0;
}
@Override
public Tuple2<T, Number> createAccumulator() {
return Tuple2.of(null, 0L);
}
public void accumulate(Tuple2<T, Number> acc, T value, Number order) {
if (order.doubleValue() > acc.f1.doubleValue()) {
acc.f0 = value;
acc.f1 = order;
}
}
}
但是我不能使用 TableEnvironment.registerFunction
. Flink使用的 TypeInformation
为了匹配sql查询中的类型,并且使用这样的定义,它无法确定类型(至少我是这么想的)。我看到有可能提供几个 accumulate
但我认为每个重载方法的返回类型必须相同。
内置聚合函数的工作原理与我想要实现的类似- MAX
可以采用任意列类型并返回相同的类型。这就是为什么我想我也能做到的原因。
1条答案
按热度按时间kuuvgm7e1#
不幸的是,flink不支持具有灵活返回类型的聚合函数。对于
MAX
函数,内部实现定义独立于类型的核心逻辑,然后为每个支持的类型创建一个实现(参见代码)。在内部,
MAX
然后根据类型Map到正确的实现。如果您将函数定义并注册为用户定义的聚合函数,我认为这是不可能的。