联接操作后 Flink SQL水印策略

9rnv2umw  于 2022-12-09  发布在  Apache
关注(0)|答案(1)|浏览(236)

My problem is that I cannot use the ORDER BY clause after the JOIN operation. To reproduce the problem,

CREATE TABLE stack (
    id INT PRIMARY KEY,
    ts TIMESTAMP(3),
    WATERMARK FOR ts AS ts - INTERVAL '1' SECONDS
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '5',
  'fields.id.kind'='sequence',
 'fields.id.start'='1',
 'fields.id.end'='100'
);

This table has a watermark strategy and TIMESTAMP(3) *ROWTIME* type on ts .

Flink SQL> DESC stack;
+------+------------------------+-------+---------+--------+----------------------------+
| name |                   type |  null |     key | extras |                  watermark |
+------+------------------------+-------+---------+--------+----------------------------+
|   id |                    INT | FALSE | PRI(id) |        |                            |
|   ts | TIMESTAMP(3) *ROWTIME* |  TRUE |         |        | `ts` - INTERVAL '1' SECOND |
+------+------------------------+-------+---------+--------+----------------------------+
2 rows in set

However, if I define a view as a simple self-join

CREATE VIEW self_join AS (
SELECT l.ts, l.id, r.id
FROM stack as l INNER JOIN stack as r
ON l.id=r.id
);

it loses the watermark strategy but not the type,

Flink SQL> DESC self_join;
+------+------------------------+-------+-----+--------+-----------+
| name |                   type |  null | key | extras | watermark |
+------+------------------------+-------+-----+--------+-----------+
|   ts | TIMESTAMP(3) *ROWTIME* |  TRUE |     |        |           |
|   id |                    INT | FALSE |     |        |           |
|  id0 |                    INT | FALSE |     |        |           |
+------+------------------------+-------+-----+--------+-----------+
3 rows in set

I assume that we can preserve the watermark strategy and use ORDER BY after a JOIN operation but this is not the case. How can I add a watermark strategy again to the VIEW ?
Thanks in advance.

qhhrdooz

qhhrdooz1#

每当Flink SQL在流模式下执行常规连接(没有任何时间约束的连接)时,结果不可能有水印,这意味着你不能对结果进行排序或应用窗口。
为什么会这样,你能做些什么呢?

    • 背景**

Flink SQL使用时间属性因为stack流/表具有时间属性,所以我们知道该流将或多或少地按顺序处理,按时间(元素被约束为最多1秒无序)。然后,这对必须保留多少状态以执行诸如对该表排序之类的操作施加了严格的约束--1秒长的缓冲区就足够了。
如果stack没有定义时间属性(即定义了水印的时间戳字段),则Flink SQL将拒绝对其进行排序(在流模式下),因为这样做需要保持无限数量的状态,并且不可能知道在发出第一个结果之前要等待多长时间。

    • 常规联接的结果不能有明确定义的水印策略**

任何类型的正则连接都要求Flink在其状态后端永远存储输入表的所有行(Flink愿意尝试这样做)。但更重要的是,水印并没有在结果上定义好,因为没有约束它可能有多乱序。
"你能做什么"
如果将联接修改为间隔联接或临时联接,则结果仍将具有水印。例如,可以执行以下操作:

CREATE VIEW self_join AS (
  SELECT l.ts, l.id, r.id
  FROM stack as l INNER JOIN stack as r
  ON l.id=r.id
  WHERE ls.ts BETWEEN r.ts - INTERVAL '1' MINUTE AND r.ts
);

或者您可以这样做:

CREATE VIEW self_join AS (
  SELECT l.ts, l.id, r.id
  FROM stack as l INNER JOIN stack as r FOR SYSTEM_TIME AS OF r.ts
  ON l.id=r.id
);

在这两种情况下,Flink的SQL引擎能够保留的状态比常规连接少,并且能够在输出流/表中产生水印。
另一种可能的解决方案是将结果表转换为DataStream,然后使用DataStream API应用水印,再将该流转换回表。但是,只有当您具有一些领域知识,能够了解结果流的无序程度时,这才有意义--而且您可能已经将相同的信息表示为间隔连接或时间连接。

相关问题