我想在processfunction中使用flink异步i/o执行cassandra查找。现在,当我在main方法中执行此操作时,结果将作为数据流返回,如下所示-
public static void main(String args[]) throws Exception {
ConnectionProps props = new ConnectionProps(args, "props.properties");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Tuple2<String, String> sample1 = new Tuple2<>("flink", "flink-user-1");
Tuple2<String, String> sample2 = new Tuple2<>("flink", "flink-user-2");
DataStream<Tuple2<String, String>> samples = env.fromElements(sample1, sample2);
//Result will be in the form of <query, resultset.toString()>
DataStream<Tuple2<String, String>> lookupResult = AsyncDataStream
.orderedWait(samples, new AsyncCassandraReader(props),
5, TimeUnit.SECONDS,
props.getAsyncIoCapacity());
lookupResult.print();
}
在哪里 AsyncCassandraReader
有一个 asyncInvoke
方法,该方法基于 DataStream<Tuple2<String, String>>
通过。
我如何在一个房间里做同样的事情 ProcessFunction
?
暂无答案!
目前还没有任何答案,快来回答吧!