java—如何在processfunction中使用flink异步i/o执行异步cassandra查找?

dtcbnfnu  于 2021-06-21  发布在  Flink
关注(0)|答案(0)|浏览(176)

我想在processfunction中使用flink异步i/o执行cassandra查找。现在,当我在main方法中执行此操作时,结果将作为数据流返回,如下所示-

public static void main(String args[]) throws Exception {
    ConnectionProps props = new ConnectionProps(args, "props.properties");

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Tuple2<String, String> sample1 = new Tuple2<>("flink", "flink-user-1");
    Tuple2<String, String> sample2 = new Tuple2<>("flink", "flink-user-2");

    DataStream<Tuple2<String, String>> samples = env.fromElements(sample1, sample2);

    //Result will be in the form of <query, resultset.toString()>
    DataStream<Tuple2<String, String>> lookupResult = AsyncDataStream
                .orderedWait(samples, new AsyncCassandraReader(props), 
                             5, TimeUnit.SECONDS, 
                             props.getAsyncIoCapacity());

    lookupResult.print();
}

在哪里 AsyncCassandraReader 有一个 asyncInvoke 方法,该方法基于 DataStream<Tuple2<String, String>> 通过。
我如何在一个房间里做同样的事情 ProcessFunction ?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题