您好,我想从关系数据库中连续读取数据以获取新数据。我编写了代码,但它只执行了一次select查询,没有重复执行。迭代器只迭代结果流,但不多次执行查询。
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
JDBCInputFormatBuilder inputBuilder = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(JDBCConfig.DRIVER_CLASS)
.setDBUrl(JDBCConfig.DB_URL)
.setQuery(JDBCConfig.SELECT_FROM_SOURCE)
.setRowTypeInfo(JDBCConfig.ROW_TYPE_INFO);
SingleOutputStreamOperator<Row> source = environment.createInput(inputBuilder.finish())
.keyBy(0)
.fold(null, new FoldFunction<Row, Row>(){
@Override
public Row fold(Row row1, Row row) throws Exception {
Date dt = (Date) row.getField(2);
return row;
}
});
IterativeStream<Row> iteration = source.iterate();
iteration.closeWith(iteration.filter(new FilterFunction<Row>() {
@Override
public boolean filter(Row row) throws Exception {
if(Integer.parseInt(row.getField(0).toString()) > 0) {
return true;
}
return false;
}
}));
//iteration.print();
source.writeUsingOutputFormat(JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername(JDBCConfig.DRIVER_CLASS)
.setDBUrl(JDBCConfig.DB_URL)
.setQuery(JDBCConfig.INSERT_SQL)
.setSqlTypes(new int[]{Types.BIGINT, Types.VARCHAR, Types.TIMESTAMP_WITH_TIMEZONE})
.finish());
environment.execute();
暂无答案!
目前还没有任何答案,快来回答吧!