我有一个springboot gradle项目,使用apache flink处理数据流信号。当一个新的信号通过数据流时,我想使用postgres数据库表中的一个id来查询look up(即findbyid())它的详细信息,postgres数据库表已经创建,以便获得关于信号的附加信息并丰富数据。我希望避免使用spring依赖项来执行查找(即autowire存储库),并希望继续使用flink实现来执行查找。
在哪里可以指定如何添加postgres连接配置信息,如端口、数据库、url、用户名、密码等(为了简单起见,我们可以假设postgres db在我的机器中是本地的)。它是否像将配置添加到application.properties文件一样简单?如果是这样的话,在按非主键值搜索时,如何编写查询方法在postgres表中查找记录?
一些在线资源建议使用这个框架代码,但我不确定/id如何适合我的用例(我创建了一个evententity模型,其中包含我正在查找的表中的所有参数/列)。 like so
```
public class DatabaseMapper extends RichFlatMapFunction<String, EventEntity> {
// Declare DB connection & query statements
public void open(Configuration parameters) throws Exception {
//Initialize DB connection
//prepare query statements
}
@Override
public void flatMap(String value, Collector<EventEntity> out) throws Exception {
}
}
1条答案
按热度按时间cunj1qz11#
您的示例代码是正确的。您可以在中设置postgresql的所有自定义初始化和准备代码
open()
方法。然后您可以在flatMap()
功能。下面是redis操作的一个示例
我在这里使用了richaxync函数,我建议您按照最佳实践中建议的那样做。阅读此处了解更多信息:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/asyncio.html)
您可以在构造函数方法中传递配置参数,并在初始化过程中使用它