使用PyFlink跟踪聚合值中的更改

ws51t4hk  于 2022-12-09  发布在  Apache
关注(0)|答案(1)|浏览(151)

Suppose we have an incoming stream of ad click events and we want to track the number of clicks each ad got in the last 5 minutes.
Our input schema is

ad_id      VARCHAR(10),
clicked_at TIMESTAMP(3)

and the output schema is

ad_id      VARCHAR(10),
clicks     INT,
updated_at TIMESTAMP(3)

Using the Table API in PyFlink, we can use something like

t_env.from_path(INPUT_TABLE_NAME) \
     .window(Slide.over('5.minutes')
                  .every('1.minute')
                  .on('clicked_at')
                  .alias('w')) \
     .group_by('ad_id, w') \
     .select('ad_id, ad_id.count as clicks, w.end as updated_at') \
     .execute_insert(OUTPUT_TABLE_NAME)

to update the counters every minute and output the results.
For example, if the input data was

| ad_id | clicked_at |
| ----- | ---------- |
| ad1   | 12:00:01   |
| ad2   | 12:00:02   |
| ad3   | 12:00:03   |
| ad1   | 12:01:01   |
| ad2   | 12:02:02   |
| ad3   | 12:03:03   |

the output will probably look like this:

| ad_id | clicks | updated_at |
| ----- | ------ | ---------- |
| ad1   |      1 | 12:01:00   |
| ad2   |      1 | 12:01:00   |
| ad3   |      1 | 12:01:00   |
| ad1   |      2 | 12:02:00   |
| ad2   |      1 | 12:02:00   |
| ad3   |      1 | 12:02:00   |
| ad1   |      2 | 12:03:00   |
| ad2   |      2 | 12:03:00   |
| ad3   |      1 | 12:03:00   |
| ad1   |      2 | 12:04:00   |
| ad2   |      2 | 12:04:00   |
| ad3   |      2 | 12:04:00   |
| ad1   |      2 | 12:05:00   |
| ad2   |      2 | 12:05:00   |
| ad3   |      2 | 12:05:00   |
| ad1   |      1 | 12:06:00   |
| ad2   |      1 | 12:06:00   |
| ad3   |      1 | 12:06:00   |
| ad2   |      1 | 12:07:00   |
| ad3   |      1 | 12:07:00   |
| ad3   |      1 | 12:08:00   |

While the code above produces the desired results, it isn't really space-efficient, and the output table contains a lot of redundant data. Suppose there was a million of ads in our system, the code above would output a million of records every minute!
How can we modify the code to output a new record only when the counter values actually change? Following the example above, can we make the output look like this?

| ad_id | clicks | updated_at |
| ----- | ------ | ---------- |
| ad1   |      1 | 12:01:00   |
| ad2   |      1 | 12:01:00   |
| ad3   |      1 | 12:01:00   |
| ad1   |      2 | 12:02:00   |
| ad2   |      2 | 12:03:00   |
| ad3   |      2 | 12:04:00   |
| ad1   |      1 | 12:06:00   |
| ad2   |      1 | 12:06:00   |
| ad3   |      1 | 12:06:00   |

Bonus question: is there a simple way to get an extra record when a counter goes back to 0? To make our output look like this?

| ad_id | clicks | updated_at |
| ----- | ------ | ---------- |
| ad1   |      1 | 12:01:00   |
| ad2   |      1 | 12:01:00   |
| ad3   |      1 | 12:01:00   |
| ad1   |      2 | 12:02:00   |
| ad2   |      2 | 12:03:00   |
| ad3   |      2 | 12:04:00   |
| ad1   |      1 | 12:06:00   |
| ad2   |      1 | 12:06:00   |
| ad3   |      1 | 12:06:00   |
| ad1   |      0 | 12:07:00   |
| ad2   |      0 | 12:08:00   |
| ad3   |      0 | 12:09:00   |
rqenqsqc

rqenqsqc1#

For the first part of your question, you could use MATCH_RECOGNIZE (or a self-join) to detect when there's been a change, and only emit those. This will increase the amount of state Flink is keeping (if you use a self-join, be especially careful).
For the second part, including zeros in the report is harder, as Flink's window operators only produce results based on processing events (empty windows don't exist and can't produce results). I don't think this can be done efficiently without resorting to using the DataStream API (with a KeyedProcessFunction or CEP), or by adding dummy events to the stream so that the windows aren't empty, and then filtering those events out of the results.

相关问题