如何整合Apache·Flink和Kafka?

qxgroojn  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(281)

我创建了我的脚本java,将生产者kafka的enents流式传输到flink打印,我的代码在集群上运行,但它不显示消息控制台。在下一个我的java代码中,从kafka流式处理到控制台打印:

package com.flink;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

public class ReadFromKafka {

    public static void main(String[] args) throws Exception {
        // create execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                  // my parametres ever runin the job :
                 //--topic test --bootstrap.servers 192.168.3.152:9092 --zookeeper.connect 192.168.3.152:2181 --group.id group_test
        // parse user parameters
        ParameterTool parameterTool = ParameterTool.fromArgs(args);

        DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));

        messageStream.rebalance().map(new MapFunction<String, String>() {
            private static final long serialVersionUID = -6867736771747690202L;

            @Override
            public String map(String value) throws Exception {
                return "Kafka and Flink says: " + value;
            }
        }).print();

        env.execute();
    }
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题