我有一个Flink应用程序,它处理来自2个流的数据。我使用的是Table API,我希望使用一个流1中的数据,查询另一个流2,并获取具有最新时间戳的记录-
我现在有这个-
def insert_into_output(output_table_name, event_table_name, code_table_name):
return """
INSERT INTO {0} (ip, sn, code, timestamp)
SELECT DISTINCT
ip, sn, code, timestamp
FROM {2} WHERE
sn =
(SELECT
sn
FROM {1}
WHERE timestamp =
(SELECT MAX(timestamp) FROM {1}))
""".format(output_table_name, event_table_name, code_table_name)
不幸的是,我得到了一个错误声明-doesn't support consuming update and delete changes which is produced by node GroupAggregate(groupBy=[ip, sn, code, timestamp], select=[ip, sn, code, timestamp])
。有什么想法吗?
1条答案
按热度按时间olhwl3o21#
使用
MAX(TIMESTAMP)
执行SQL查询的结果意味着结果可能会不断变化,因为现在的时间戳可能比5分钟前的时间戳更高。因此,此SQL语句的结果是一个收回流。您可以在表到流转换文档中阅读有关此内容的更多信息您正在发射到Kinesis,但它不支持收回流,仅支持append streams