使用Springboot API获取Apache Flink结果

qv7cva1a  于 2023-01-20  发布在  Spring
关注(0)|答案(1)|浏览(119)

我已经创建了一个springboot代码来使用API,以便在用户到达端点时获取flink结果。
static Map<String, Long> finalList = new HashMap<String, Long>();
以上是main方法外的类变量,在main方法内,使用了一个用于flink的datastream API,通过KafkaSource类从Kafka中获取数据,并在flink中执行聚合,数据流被放入data_15min变量,该变量是DataStream〈Map〈String,Long〉〉的一个对象。
我使用了一个map函数从数据流中获取数据,并将它们放入上面的finalList对象中。

data_15min.map(new MapFunction<Map<String, Long>, Object>() {
                ObjectMapper objMapperNew = new ObjectMapper();
                @Override
                public Object map(Map<String, Long> value) throws Exception {
                    System.out.println("Value in map: " + value);
                    System.out.println("KeySet: " + value.keySet());
                    value.forEach((key, val) -> {

                        if(!dcListNew.contains(key)) {
                            dcListNew.add(key);
                            finalList.put(key, val);

                        } else {
                            finalList.replace(key, val);
                        }
                    });

                    System.out.println("DC List NEw inside: " + dcListNew);
                    System.out.println("Final List inside: " + objMapperNew.writeValueAsString(finalList));
                    return finalList;
                }
});
System.out.println("Final List: " + finalList);

在上面的map函数中打印finalList时,我得到的数据为
Final List inside: {"ABC": 2, "AXY": 10}
在Map之外打印时,我仍将其视为:Final List: {}
有人能帮我一下吗,这样我就可以在map方法之外使用这些数据,并将其发送到springboot API response?

brjng4g3

brjng4g31#

我认为这不起作用的主要原因是因为并发性,我假设您正在您的环境中的某个地方调用execute(),所以只有在execute()完成之后,您的finalList才已经填充了记录。
注意,这样工作一点都不安全,很可能会在短时间内导致错误.总的来说,解决方案的架构似乎缺少了一些东西,比如数据库,你可以在那里存储从Flink得到的结果,然后从Spring访问数据库.

相关问题