我有一个场景,在nifi的帮助下使用casandra的数据在flink内部使用,我可以通过连接casandra表“selectfromtablename”使用nifi模板获取数据。但是,我需要控制从flink获取数据,而不是按照给定的计划间隔获取数据。有没有可能是“selectfrom tablename where fieldname=“given name from flink”。使用flink有可能吗?
public class NiFiSourceTopologyExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
.url("http://localhost:8080/nifi")
.portName("CasandraOut")
.requestBatchCount(5)
.buildConfig();
try{
SourceFunction<NiFiDataPacket> nifiSource = new NiFiSource(clientConfig);
DataStream<NiFiDataPacket> streamSource = env.addSource(nifiSource);
DataStream<String> dataStream = streamSource.map(new MapFunction<NiFiDataPacket, String>() {
@Override
public String map(NiFiDataPacket value) throws Exception {
return new String(value.getContent(), Charset.defaultCharset());
}
});
dataStream.print();
env.execute();
}catch(Exception e)
{
System.out.println("Error->"+e.getMessage());
}
}
}
暂无答案!
目前还没有任何答案,快来回答吧!