flink kafka-如何使应用程序并行运行?

ql3eal8s  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(517)

我正在flink中创建一个应用程序
阅读主题中的消息
做一些简单的处理
将结果写入其他主题
我的代码可以工作,但是它不能并行运行
我该怎么做?
我的代码似乎只在一个线程/块上运行?
在flink web Jmeter 板上:
应用进入运行状态
但是,在overview子任务中只显示了一个块
和接收/发送的字节数,接收/发送的记录始终为零(无更新)
这是我的代码,请帮助我学习如何分割我的应用程序,以便能够并行运行,我写的应用程序正确吗?

public class SimpleApp {

    public static void main(String[] args) throws Exception {

        // create execution environment INPUT
        StreamExecutionEnvironment env_in  =    
                 StreamExecutionEnvironment.getExecutionEnvironment();
        // event time characteristic
        env_in.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // production Ready (Does NOT Work if greater than 1)
        env_in.setParallelism(Integer.parseInt(args[0].toString()));

        // configure kafka consumer
        Properties properties = new Properties();
        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("auto.offset.reset", "earliest");

        // create a kafka consumer
        final DataStream<String> consumer = env_in
                .addSource(new FlinkKafkaConsumer09<>("test", new   
                            SimpleStringSchema(), properties));

        // filter data
        SingleOutputStreamOperator<String> result = consumer.filter(new  
            FilterFunction<String>(){
            @Override
            public boolean filter(String s) throws Exception {
                return s.substring(0, 2).contentEquals("PS");
            }
        });

        // Process Data
        // Transform String Records to JSON Objects
        SingleOutputStreamOperator<JSONObject> data = result.map(new 
                MapFunction<String, JSONObject>()
        {
            @Override
            public JSONObject map(String value) throws Exception
            {
                JSONObject jsnobj = new JSONObject();

                if(value.substring(0, 2).contentEquals("PS"))
                {
                    // 1. Raw Data
                    jsnobj.put("Raw_Data", value.substring(0, value.length()-6));

                    // 2. Comment
                    int first_index_comment = value.indexOf("$");
                    int last_index_comment  = value.lastIndexOf("$") + 1;
                    //   - set comment
                    String comment          =  
                    value.substring(first_index_comment, last_index_comment);
                    comment = comment.substring(0, comment.length()-6);
                    jsnobj.put("Comment", comment);
                }
                else {
                    jsnobj.put("INVALID", value);
                }

                return jsnobj;
            }
        });

        // Write JSON to Kafka Topic
        data.addSink(new FlinkKafkaProducer09<JSONObject>("localhost:9092",
                "FilteredData",
                new SimpleJsonSchema()));

        env_in.execute();
    }
}

我的代码确实可以工作,但它似乎只在web界面中的单个线程(显示了一个块)上运行(没有数据传递,因此发送/接收的字节不会更新)。
如何使它并行运行?

eqzww0vc

eqzww0vc1#

要并行运行您的工作,您可以做两件事:
在env级别增加工作的并行性-例如
streamexecutionenvironment env_in=streamexecutionenvironment.getexecutionenvironment().setparallelism(4);
但这只会在flink端读取数据后增加并行性,因此如果源代码生成数据的速度更快,则可能无法充分利用它。
要完全并行化作业,请为kafka主题设置多个分区,理想情况下是flink作业所需的并行量。因此,在创建Kafka主题时,您可能需要执行以下操作:
bin/kafka-topics.sh--创建--缩放器localhost:2181 --replication-factor 3--分区4--主题测试

相关问题