我计划使用flink高级API对Kafka主题进行流处理,并执行滚动窗口,然后进行缩减和Map。对于Map函数,我使用了一个扩展RichMapFunction的自定义类。混淆与map类中的open()和close()函数有关。
调用这些函数的时间,在每个窗口结束前调用一次,或在每个flink任务开始时调用一次。
即:如果窗口为5分钟,这些函数是在窗口迭代前每5分钟调用一次,还是在每次flink任务启动时调用一次?
这是指向类定义的链接:https://nightlies.apache.org/flink/flink-docs-release-1.2/api/java/org/apache/flink/api/common/functions/RichFunction.html
文档中的语句让我很困惑“这个方法将在每个迭代superstep的开始被调用”,这到底是什么意思。
而且,写了open函数适合于一次性设置工作,但没有写close函数解释(因为适合于一次性清除工作)。
目的是在Flink作业中建立一个数据库连接。我应该在哪里建立连接?全局地作为map函数类的构造的一部分还是在open()函数中?我可以在哪里关闭连接?
提前感谢!
1条答案
按热度按时间bihw5rsg1#
本文档将帮助您了解何时调用
open()
:https://nightlies.apache.org/flink/flink-docs-stable/docs/internals/task_lifecycle/数据库连接应在
open()
中建立,并在close()
中关闭