我已经创建了一个springboot代码来使用API,以便在用户到达端点时获取flink结果。static Map<String, Long> finalList = new HashMap<String, Long>();
以上是main方法外的类变量,在main方法内,使用了一个用于flink的datastream API,通过KafkaSource类从Kafka中获取数据,并在flink中执行聚合,数据流被放入data_15min变量,该变量是DataStream〈Map〈String,Long〉〉的一个对象。
我使用了一个map函数从数据流中获取数据,并将它们放入上面的finalList对象中。
data_15min.map(new MapFunction<Map<String, Long>, Object>() {
ObjectMapper objMapperNew = new ObjectMapper();
@Override
public Object map(Map<String, Long> value) throws Exception {
System.out.println("Value in map: " + value);
System.out.println("KeySet: " + value.keySet());
value.forEach((key, val) -> {
if(!dcListNew.contains(key)) {
dcListNew.add(key);
finalList.put(key, val);
} else {
finalList.replace(key, val);
}
});
System.out.println("DC List NEw inside: " + dcListNew);
System.out.println("Final List inside: " + objMapperNew.writeValueAsString(finalList));
return finalList;
}
});
System.out.println("Final List: " + finalList);
在上面的map函数中打印finalList时,我得到的数据为Final List inside: {"ABC": 2, "AXY": 10}
在Map之外打印时,我仍将其视为:Final List: {}
有人能帮我一下吗,这样我就可以在map方法之外使用这些数据,并将其发送到springboot API response?
1条答案
按热度按时间brjng4g31#
我认为这不起作用的主要原因是因为并发性,我假设您正在您的环境中的某个地方调用
execute()
,所以只有在execute()
完成之后,您的finalList
才已经填充了记录。注意,这样工作一点都不安全,很可能会在短时间内导致错误.总的来说,解决方案的架构似乎缺少了一些东西,比如数据库,你可以在那里存储从Flink得到的结果,然后从Spring访问数据库.