flink排序

9w11ddsr  于 2021-06-21  发布在  Flink
关注(0)|答案(0)|浏览(379)

我正在尝试使用表api对批处理表执行窗口操作。
我的批处理表具有以下架构:

root
 |-- startdate: TIMESTAMP(3)
 |-- enddate: TIMESTAMP(3)
 |-- quantity: LEGACY(BigDecimal)
 |-- productid: LEGACY(BigDecimal)

基于此模式,我尝试执行以下操作:

val env = BatchTableEnvironment.create(fsEnv)
val ds = ...
val table = env.fromDataSet("id, startdate, enddate, quantity")
val result = table
.window(Over.partitionBy("productid").orderBy("startdate").as("w"))
.select("startdate, enddate, quantity.sum over w as sumquantity, productid")

在执行上述代码时,出现以下错误:

Ordering must be defined on a time attribute.

但当我查看flink源代码时,我发现了以下评论:

/**
 * Specifies the time attribute on which rows are ordered.
 *
 * <p>For streaming tables, reference a rowtime or proctime time attribute here
 * to specify the time mode.
 *
 * <p>For batch tables, refer to a timestamp or long attribute.
 *
 * @param orderBy field reference
 * @return an over window with defined order
 */
public OverWindowPartitionedOrdered orderBy(String orderBy) {
    return this.orderBy(ExpressionParser.parseExpression(orderBy));
}

这里它清楚地说:“对于批处理表,请参阅timestamp或long属性。”。如您所见,startdate属性的类型是timestamp。我用的是flink 1.9
这是虫子还是我遗漏了什么?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题