Apache Flink表API插入语句

jslywgbw  于 2022-12-09  发布在  Apache
关注(0)|答案(1)|浏览(192)

我有一个Flink应用程序,它处理来自2个流的数据。我使用的是Table API,我希望使用一个流1中的数据,查询另一个流2,并获取具有最新时间戳的记录-
我现在有这个-

def insert_into_output(output_table_name, event_table_name, code_table_name):
        return """
        INSERT INTO {0} (ip, sn, code, timestamp)
        SELECT DISTINCT
        ip, sn, code, timestamp
        FROM {2} WHERE
        sn =
        (SELECT 
        sn
        FROM {1}
        WHERE timestamp = 
        (SELECT MAX(timestamp) FROM {1}))
        """.format(output_table_name, event_table_name, code_table_name)

不幸的是,我得到了一个错误声明-doesn't support consuming update and delete changes which is produced by node GroupAggregate(groupBy=[ip, sn, code, timestamp], select=[ip, sn, code, timestamp])。有什么想法吗?

olhwl3o2

olhwl3o21#

使用MAX(TIMESTAMP)执行SQL查询的结果意味着结果可能会不断变化,因为现在的时间戳可能比5分钟前的时间戳更高。因此,此SQL语句的结果是一个收回流。您可以在表到流转换文档中阅读有关此内容的更多信息
您正在发射到Kinesis,但它不支持收回流,仅支持append streams

相关问题