ApacheFlink—有没有一种方法可以定义一个动态表,其中包含最近没有被事件触及的条目?

qyyhg6bp  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(333)

我是flink的新手,我正在尝试使用它来获得我的应用程序的一堆实时视图。我想构建的动态视图中至少有一个是显示未满足sla或者基本上已过期的条目,其条件是进行简单的时间戳比较。所以我基本上希望一个条目显示在我的动态表中,如果它最近没有被某个事件触及的话。在dev环境中使用flink1.6(由于aws kinesis的限制),我没有看到flink正在重新评估一个条件,除非某个事件触及该条目。
我已经将我的开发环境插入到一个kinesis流中,该流正在从web服务器发送实时访问日志事件。这不是我真正的用例,但它很容易开始测试。我已经编写了一个简单的表查询,它会拉入一个请求路径,它的最后一次访问时间,并计算一个布尔标志来指示它是否在最后一分钟没有被访问。我通过连接到printsinkfunction的retract流对此进行调试,以便将所有更新/删除打印到我的控制台。

tEnv.registerDataStream("AccessLogs", accessLogs, "username, status, request, responseSize, referrer, userAgent, requestTime, ActionTime.rowtime");

Table paths = tEnv.sqlQuery("SELECT request AS path, MAX(requestTime) as lastTime, CASE WHEN MAX(requestTime) < CURRENT_TIMESTAMP - INTERVAL '1' MINUTE THEN 1 ELSE 0 END AS expired FROM AccessLogs GROUP BY request");

DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(paths, Row.class);
retractStream .addSink(new PrintSinkFunction<>());

我希望当我访问一个页面时,一个add事件被发送到这个流。如果我等待1分钟(不执行任何操作),表中的case语句的值将为1,因此我应该看到一个delete,然后使用该标志集添加事件。
我实际上看到的是,在我再次加载该页面之前,什么都不会发生。delete事件实际上设置了标志,而紧接着的add事件再次清除了标志(因为它不再“过期”)。

// add/delete, path, lastAccess, expired
(true,/mypage,2019-05-20 20:02:48.0,0) // first page load, add event
(false,/mypage,2019-05-20 20:02:48.0,1) // second load > 2 mins later, remove event for the entry with expired flag set
(true,/mypage,2019-05-20 20:05:01.0,0) // second load, add event

编辑:我在搜索中遇到的最有用的技巧是创建一个processfunction。我认为这是我可以使用动态表来实现的(在某些情况下,我会使用中间流来查看计算的日期),但希望不必达到这个目的。
我已经使用了processfunction方法,但它需要比我最初想象的更多的修补:
我不得不在pojo中添加一个字段,该字段会在ontimer()方法中发生更改(可以是日期,也可以是您每次只需修改的版本)
我必须将此字段注册为动态表的一部分
我必须在查询中使用此字段,以便重新计算查询并更改布尔标志(即使我实际上没有使用新字段)。我只是在select子句中添加了它。

u91tlkcl

u91tlkcl1#

您的方法看起来很有希望,但是flink的表api/sql(yet)不支持将其与移动的“now”时间戳进行比较。
我将分两步解决这个问题。
在upsert模式下注册动态表,即每个键都upsert的表( request 在您的情况下)基于版本时间戳( requestTime 在你的情况下)。生成的动态表将保存每个请求的最新行。
使用一个简单的filter predicate 进行查询,该 predicate 比较动态(upsert)表的行的版本时间戳,并过滤掉所有时间戳太接近现在的行。
不幸的是,这两个特性(upsert转换和与移动的“now”时间戳的比较)在flink中都不可用。不过,upsert表转换还有一些正在进行的工作。

相关问题