flink:如何在关系数据库上执行连续查询

xvw2m8pv  于 2021-06-25  发布在  Flink
关注(0)|答案(0)|浏览(263)

您好,我想从关系数据库中连续读取数据以获取新数据。我编写了代码,但它只执行了一次select查询,没有重复执行。迭代器只迭代结果流,但不多次执行查询。

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
    JDBCInputFormatBuilder inputBuilder = JDBCInputFormat.buildJDBCInputFormat()
            .setDrivername(JDBCConfig.DRIVER_CLASS)
            .setDBUrl(JDBCConfig.DB_URL)
            .setQuery(JDBCConfig.SELECT_FROM_SOURCE)
            .setRowTypeInfo(JDBCConfig.ROW_TYPE_INFO);

    SingleOutputStreamOperator<Row> source = environment.createInput(inputBuilder.finish())
            .keyBy(0)
            .fold(null, new FoldFunction<Row, Row>(){
                @Override
                public Row fold(Row row1, Row row) throws Exception {
                    Date dt = (Date) row.getField(2);
                    return row;
                }
            });

    IterativeStream<Row> iteration = source.iterate();
    iteration.closeWith(iteration.filter(new FilterFunction<Row>() {
        @Override
        public boolean filter(Row row) throws Exception {
            if(Integer.parseInt(row.getField(0).toString()) > 0) {
                return true;
            }
            return false;
        }
    }));

    //iteration.print();

    source.writeUsingOutputFormat(JDBCOutputFormat.buildJDBCOutputFormat()
            .setDrivername(JDBCConfig.DRIVER_CLASS)
            .setDBUrl(JDBCConfig.DB_URL)
            .setQuery(JDBCConfig.INSERT_SQL)
            .setSqlTypes(new int[]{Types.BIGINT, Types.VARCHAR, Types.TIMESTAMP_WITH_TIMEZONE})
            .finish());

    environment.execute();

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题