如何在flink程序中逐行阅读Kafka的主题

soat7uwm  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(322)

首先,我在kafka topic中加载一个csv文件,然后我可以在中通过flink程序打印这个主题。代码如下:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Properties prop = new Properties();
    prop.setProperty("bootstrap.servers", 
     "10.32.0.2:9092,10.32.0.3:9092,10.32.0.4:9092");
    prop.setProperty("group.id", "test");
    FlinkKafkaConsumer<String> myConsumer= new FlinkKafkaConsumer<> 
     ("flinkTopic", new SimpleStringSchema(),prop);
    myConsumer.setStartFromEarliest();
    DataStream<String> stream = env.addSource(myConsumer);
    stream.print();
    env.execute("Flink Streaming Java API Skeleton");

我的问题是我想一行一行地读这个主题,并分别处理每一行,你能指导我如何一行一行地读Kafka的主题吗?
任何帮助都将不胜感激。

j91ykkif

j91ykkif1#

关于您可能会做什么的示例,我建议您通过在线apache flink培训来完成。可以使用filter、map、flatmap、windows和processfunctions等操作逐行处理流。
您可能想知道如何方便地使用csv数据。最简单的方法是使用table/sqlapi,它有自己的kafka连接器和csv格式。
不使用flink的sql引擎,就可以实现一个map函数,将每行文本转换成pojo。这里有一个例子。或者实现您自己的反序列化程序,而不是使用simplestringschema。

相关问题