I have a simple interval join between two unbounded streams. This works with small workloads, but with a larger (production environment) it no longer works. From observing the output I can see that the Flink SQL Job triggers/emitts records only once the entire topic has been scanned (and consequently read into memory?), but I would want the job to trigger the record as soon as as ingle match is found. Since in my production environment the job cannot withstand reading the entire table into memory.
The interval join which I'm making is very similar to the examples provided here: https://github.com/ververica/flink-sql-cookbook/blob/main/joins/02_interval_joins/02_interval_joins.md
SELECT
o.id AS order_id,
o.order_time,
s.shipment_time,
TIMESTAMPDIFF(DAY,o.order_time,s.shipment_time) AS day_diff
FROM orders o
JOIN shipments s ON o.id = s.order_id
WHERE
o.order_time BETWEEN s.shipment_time - INTERVAL '3' DAY AND s.shipment_time;
Except my time interval is as small as possible (couple of seconds). I also have a watermark of 5 seconds on the Flink SQL source tables.
How can I instruct Flink to emitt/trigger the records as soon as it has made a single 'match' with the join? As currently the job is trying to scan the entire table before emitting any records, which is not feasible with my data volumes. From my understanding it should only need to scan up until the interval (time window) and check that, and once the interval is passed then the record is emitted/triggered.
Also from observing the cluster I can see that the watermark is moving, but no records are being emitted.
1条答案
按热度按时间uttx8gqw1#
May be some data was abandoned, you can check your event time whether if it's reasonable . In this scenes, you can try to use
regular join
and set a 3 days ttl(table.state.ttl = 3 days
) which can ensure output for every data joined.