flink:仅使用sql将收缩sql转换为附加sql,以提供时态表

1yjd4xko  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(528)

我为用户提供了一个flinksql接口,所以我不能真正使用table或java/scala接口。一切都需要在sql中指定。不过,我可以解析sql文件中的注解,并添加指定的即席低级api指令。
一个用户如何转换,比如说:

SELECT b, AVG(a) "average" FROM source_data GROUP BY b

name: average_source_data_retracting
  b STRING
  average NUMERIC

-也就是把值缩回一个可以附加它们的窗体。此附加表单可以具有以下架构:

name: average_source_data_appending
  flag BOOLEAN <-- indicating an accumulate or retract message
  b STRING
  average NUMERIC

也就是说,RetractStreamTableLink相当于AppendStreamTableLink,但它不是一个接收器。
所有这些都允许使用平均源数据追加来创建时态表(过滤收回消息),但这种表只接受只追加源表。
我已经考虑过使用windows(正如这里所说的),但是我希望时态表的更新是即时的。

arknldoa

arknldoa1#

请忽略这个问题,显然时态表函数可以接受(对我来说)正在收回的表。
大意是:

SELECT b, AVG(a) "average", MAX(proctime) max_proctime FROM source_data GROUP BY b

可以接受为一个时态表函数,其中b作为键,max\u proctime作为时间属性。我猜max(proctime)在某种程度上使它认为新行被发出,而它们只是被覆盖?我想我需要更多的时间来理解这一点。
编辑:
通过对源代码的深入研究,我们发现时态表函数似乎接受收回定义,但仅当它处于处理时间下时:
temporalprocesstimejoinoperator.java:

@Override
public void processElement2(StreamRecord<BaseRow> element) throws Exception {
    if (BaseRowUtil.isAccumulateMsg(element.getValue())) {
        rightState.update(element.getValue());
        registerProcessingCleanupTimer();
    } else {
        rightState.clear();
        cleanupLastTimer();
    }
}

temporalrowtimejoinoperator.java:时间行

@Override
public void processElement2(StreamRecord<BaseRow> element) throws Exception {
    ...
    checkNotRetraction(row);
    ...
}
private void checkNotRetraction(BaseRow row) {
    if (BaseRowUtil.isRetractMsg(row)) {
        String className = getClass().getSimpleName();
        throw new IllegalStateException(
            "Retractions are not supported by " + className +
                ". If this can happen it should be validated during planning!");
    }
}

这是无证的;我不知道这是否是永久性的,文件是否会更新。

相关问题