我是Apache·Flink的新手。我有一个flink scala项目,它使用kafka集群中的数据,我需要将流结果作为参数传递给使用api的用户,该api返回经过转换的流。这是我的密码
class Testing {
def main(args: Array[String]): Unit = {}
def streamTest(): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "test1.server.local:9092,test2.server.local:9092,test3.server.local:9092")
val consumer_test = new FlinkKafkaConsumer[String]("topic_test", new SimpleStringSchema(), properties)
consumer_test.setStartFromEarliest()
val stream = env.addSource(consumer_test).setParallelism(5)
val api_test = "http://api-test.server.local/test/?msg=%s"
// Here I need pass stream as parameter to api and return transformed stream
env.execute()
}
}
有什么帮助吗?
2条答案
按热度按时间gmol16391#
这是我最后的密码。我希望这有帮助
jobtbby32#
您应该使用熟悉的任何http/rest库,然后使用
asyncIO
.