flink项目

6kkfgxo0  于 2021-06-26  发布在  Flink
关注(0)|答案(1)|浏览(371)

我有一个springboot gradle项目,使用apache flink处理数据流信号。当一个新的信号通过数据流时,我想使用postgres数据库表中的一个id来查询look up(即findbyid())它的详细信息,postgres数据库表已经创建,以便获得关于信号的附加信息并丰富数据。我希望避免使用spring依赖项来执行查找(即autowire存储库),并希望继续使用flink实现来执行查找。
在哪里可以指定如何添加postgres连接配置信息,如端口、数据库、url、用户名、密码等(为了简单起见,我们可以假设postgres db在我的机器中是本地的)。它是否像将配置添加到application.properties文件一样简单?如果是这样的话,在按非主键值搜索时,如何编写查询方法在postgres表中查找记录?
一些在线资源建议使用这个框架代码,但我不确定/id如何适合我的用例(我创建了一个evententity模型,其中包含我正在查找的表中的所有参数/列)。 like so ```
public class DatabaseMapper extends RichFlatMapFunction<String, EventEntity> {

    // Declare DB connection & query statements

    public void open(Configuration parameters) throws Exception {
        //Initialize DB connection
        //prepare query statements
    }

    @Override
    public void flatMap(String value, Collector<EventEntity> out) throws Exception {

    }
}
cunj1qz1

cunj1qz11#

您的示例代码是正确的。您可以在中设置postgresql的所有自定义初始化和准备代码 open() 方法。然后您可以在 flatMap() 功能。
下面是redis操作的一个示例
我在这里使用了richaxync函数,我建议您按照最佳实践中建议的那样做。阅读此处了解更多信息:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/asyncio.html)
您可以在构造函数方法中传递配置参数,并在初始化过程中使用它

public static class AsyncRedisOperations extends RichAsyncFunction<Object,Object> {

        private JedisPool jedisPool;
        private Configuration redisConf;

        public AsyncRedisOperations(Configuration redisConf) {
          this.redisConf = redisConf;
        } 

        @Override
        public void open(Configuration parameters) {

          JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
          jedisPoolConfig.setMaxTotal(this.redisConf.getInteger("pool", 8));
          jedisPoolConfig.setMaxIdle(this.redisConf.getInteger("pool", 8));
          jedisPoolConfig.setMaxWaitMillis(this.redisConf.getInteger("maxWait", 0));

          JedisPool jedisPool = new JedisPool(jedisPoolConfig,
            this.redisConf.getString("host", "192.168.10.10"),
            this.redisConf.getInteger("port", 6379), 5000);

          try {
            this.jedisPool = jedisPool;
            this.logger.info("Redis connected: " + jedisPool.getResource().isConnected());
          } catch (Exception e) {
            this.logger.error(BaseUtil.append("Exception while connecting Redis"));
          }

        }

        @Override
        public void asyncInvoke(Object in, ResultFuture<Object> out) {

          try (Jedis jedis = this.jedisPool.getResource()) {
            String key = jedis.get(key);
            this.logger.info("Redis Key: " + key);
          } 

        }
    }

相关问题