We are trying to run a FlinkSQL query that applies some deduplication and then windows and aggregates the result of that deduplication, but running into the following error at query plan time: org.apache.flink.table.api.TableException: StreamPhysicalWindowAggregate doesn't support consuming update and delete changes which is produced by node Deduplicate(keep=[FirstRow], key=[order_id], order=[ROWTIME])
We managed to get a simple example query reproducing this issue:
CREATE TABLE Orders (
order_id STRING,
user_id STRING,
product STRING,
num BIGINT,
user_action_time TIMESTAMP(3),
WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH ('connector' = 'datagen');
WITH Deduplicated AS (
SELECT
order_id,
user_id,
product,
num,
user_action_time
FROM
(
SELECT
*,
ROW_NUMBER() OVER (
PARTITION BY order_id
ORDER BY
user_action_time ASC
) AS row_num
FROM
Orders
)
WHERE
row_num = 1
)
SELECT
user_id,
SUM(num) as num_sum
FROM
TABLE(
TUMBLE(
TABLE Deduplicated,
DESCRIPTOR(user_action_time),
INTERVAL '5' MINUTES
)
)
GROUP BY
user_id,
window_start,
window_end
If the same query is run using PROCTIME
instead of ROWTIME
, the query runs successfully.
We are using flink 1.15.0
Is this expected behavior?
2条答案
按热度按时间5lhxktic1#
你能试试这个吗?我不知道是什么问题.
x4shl7ld2#
这目前是不可能的。你可以跟踪https://issues.apache.org/jira/browse/FLINK-27539(还不确定这是一个bug还是Flink的新特性)