我有一个spring启动应用程序,并与apacheflink集成。我想从Kafka系统中读取数据,并将它们暴露到rest端点。
下面是我的简单数据,
@GetMapping("/details/{personName}")
public String getPersonDetails() throws Exception {
StreamExecutionEnvironment env = LocalStreamEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "group_id");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test-topic-1",
new SimpleStringSchema(), properties);
consumer.setStartFromEarliest();
DataStream<String> stream = env.addSource(consumer);
stream.map(new MapFunction<String, String>() {
private static final long serialVersionUID = 1L;
@Override
public String map(String value) throws Exception {
logger.info(value);
return value;
}
}).print();
env.execute();
return "hello world";
}
我的问题是,
我的Kafka返回如下字符串值,
"id":"1","PersonName":"John","address":"Bristol","weight":"34", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"2","PersonName":"Mann","address":"Bristol","weight":"88", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"3","PersonName":"Chris","address":"Leeds","weight":"12", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"4","PersonName":"John","address":"Bristol","weight":"44", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"5","PersonName":"John","address":"NewPort","weight":"26", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"6","PersonName":"Mann","address":"Bristol","weight":"89", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
如何通过应用过滤器将其转换为json来返回。例如,如果rest调用的输入是“john”,那么我希望对它们进行分组,对权重值求和,并以json(仅名称和权重)的形式返回。
第二个问题,我无法停止执行环境。有别的选择吗?我查了Flink的文件,我没有得到任何关于我的情况。
第三个问题,我想保持在环境是急于加载,试图使用静态块,但它需要更多的时间也。
国家森林资源:
我在Kafka有大量的数据,所以想扩展和快速处理。
1条答案
按热度按时间yyyllmsg1#
听起来您可能需要花更多的时间来查看flink文档。但简而言之。。。
添加
MapFunction
它将字符串解析为json,提取名称和权重,并将其输出为tuple2<string,integer>或某个自定义java类。执行groupby(name字段),后跟
ProcessFunction
它将重量相加并保存在状态中。使用
QueryableState
向作为程序main()方法一部分运行的代码公开状态(求和的权重)。在main方法中,实现一个使用
QueryableStateClient
得到一个名字的重量。