flink数据流转换和向rest端点公开

lstz6jyr  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(413)

我有一个spring启动应用程序,并与apacheflink集成。我想从Kafka系统中读取数据,并将它们暴露到rest端点。
下面是我的简单数据,

@GetMapping("/details/{personName}")
    public String getPersonDetails() throws Exception {

        StreamExecutionEnvironment env =  LocalStreamEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();

        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "group_id");

        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test-topic-1",
                new SimpleStringSchema(), properties);
        consumer.setStartFromEarliest();
        DataStream<String> stream = env.addSource(consumer);

        stream.map(new MapFunction<String, String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public String map(String value) throws Exception {
                logger.info(value);
                return value;
            }
        }).print();
        env.execute();

        return "hello world";

    }

我的问题是,
我的Kafka返回如下字符串值,

"id":"1","PersonName":"John","address":"Bristol","weight":"34", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"2","PersonName":"Mann","address":"Bristol","weight":"88", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"3","PersonName":"Chris","address":"Leeds","weight":"12", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"4","PersonName":"John","address":"Bristol","weight":"44", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"5","PersonName":"John","address":"NewPort","weight":"26", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"6","PersonName":"Mann","address":"Bristol","weight":"89", "country":"UK","timeStamp":"2020-08-08T10:25:42"}

如何通过应用过滤器将其转换为json来返回。例如,如果rest调用的输入是“john”,那么我希望对它们进行分组,对权重值求和,并以json(仅名称和权重)的形式返回。
第二个问题,我无法停止执行环境。有别的选择吗?我查了Flink的文件,我没有得到任何关于我的情况。
第三个问题,我想保持在环境是急于加载,试图使用静态块,但它需要更多的时间也。
国家森林资源:
我在Kafka有大量的数据,所以想扩展和快速处理。

yyyllmsg

yyyllmsg1#

听起来您可能需要花更多的时间来查看flink文档。但简而言之。。。
添加 MapFunction 它将字符串解析为json,提取名称和权重,并将其输出为tuple2<string,integer>或某个自定义java类。
执行groupby(name字段),后跟 ProcessFunction 它将重量相加并保存在状态中。
使用 QueryableState 向作为程序main()方法一部分运行的代码公开状态(求和的权重)。
在main方法中,实现一个使用 QueryableStateClient 得到一个名字的重量。

相关问题