利用nifi按需从flink获取数据

r1wp621o  于 2021-06-26  发布在  Flink
关注(0)|答案(0)|浏览(187)

我有一个场景,在nifi的帮助下使用casandra的数据在flink内部使用,我可以通过连接casandra表“selectfromtablename”使用nifi模板获取数据。但是,我需要控制从flink获取数据,而不是按照给定的计划间隔获取数据。有没有可能是“selectfrom tablename where fieldname=“given name from flink”。使用flink有可能吗?

public class NiFiSourceTopologyExample {

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
            .url("http://localhost:8080/nifi")
            .portName("CasandraOut")
            .requestBatchCount(5)
            .buildConfig();

    try{

    SourceFunction<NiFiDataPacket> nifiSource = new NiFiSource(clientConfig);
    DataStream<NiFiDataPacket> streamSource = env.addSource(nifiSource);

    DataStream<String> dataStream = streamSource.map(new MapFunction<NiFiDataPacket, String>() {
        @Override
        public String map(NiFiDataPacket value) throws Exception {
            return new String(value.getContent(), Charset.defaultCharset());
        }
    });

    dataStream.print();
    env.execute();
    }catch(Exception e)
    {
        System.out.println("Error->"+e.getMessage());
    }

}

}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题