所有Kafka消费者结束后完成flink程序

j8ag8udp  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(555)

我在这里建立了一个最小的例子,其中有n个kakfa主题的n个流(在下面的例子中是100个)。
我想在每个流看到“endofstream”消息时完成它。当所有流都完成时,我希望flink程序能够优雅地完成。
当parallelism设置为1时,这是正确的,但通常不会发生。
从另一个问题来看,似乎Kafka消费群体的所有线索都没有终结。
其他人建议抛出一个异常。但是,程序将在第一个异常时终止,并且不会等待所有流完成。
我还添加了一个最小的python程序,将消息添加到kafka主题中,以实现可复制性。请填写表格 <IP>:<PORT> 在每个程序中。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        String outputPath = "file://" + System.getProperty("user.dir") + "/out/output";

        Properties kafkaProps = null;
        kafkaProps = new Properties();
        String brokers = "<IP>:<PORT>";
        kafkaProps.setProperty("bootstrap.servers", brokers);
        kafkaProps.setProperty("auto.offset.reset", "earliest");

        ArrayList<FlinkKafkaConsumer<String>> consumersList = new ArrayList<FlinkKafkaConsumer<String>>();
        ArrayList<DataStream<String>> streamList = new ArrayList<DataStream<String>>();

        for (int i = 0; i < 100; i++) {
            consumersList.add(new FlinkKafkaConsumer<String>(Integer.toString(i),
                        new SimpleStringSchema() {
                            @Override
                            public boolean isEndOfStream(String nextElement) {
                                if (nextElement.contains("EndofStream")) {
                                    // throw new RuntimeException("End of Stream");       
                                    return true;
                                } else { 
                                    return false;
                                }
                            }
                        }
                        , kafkaProps));
            consumersList.get(i).setStartFromEarliest();
            streamList.add(env.addSource(consumersList.get(i)));
            streamList.get(i).writeAsText(outputPath + Integer.toString(i), WriteMode.OVERWRITE);
        }

        // execute program
        env.execute("Flink Streaming Java API Skeleton");

python 3程序

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='<IP>:<PORT>')

for i in range(100): # Channel Number
    for j in range(100): # Message Number
        message = "Message: " + str(j) + " going on channel: " + str(i)
        producer.send(str(i), str.encode(message))

    message = "EndofStream on channel: " + str(i)
    producer.send(str(i), str.encode(message))

producer.flush()

更改此行: streamList.add(env.addSource(consumersList.get(i)));streamList.add(env.addSource(consumersList.get(i)).setParallelism(1)); 也可以完成这项工作,但是flink将所有的消费者放在同一台物理机器上。
我希望消费者也能得到分配。
flink-conf.yaml公司

parallelism.default: 2
cluster.evenly-spread-out-slots: true

最后一种方法是将每个主题写在单独的文件中,并将文件作为源文件而不是Kafka使用者。
最终目标是测试flink为某些程序处理某些工作负载所需的时间。

fruv7luv

fruv7luv1#

使用 cancel 方法的父类flinkkafkaconsumerbase FlinkKafkaConsumer .
public void cancel()说明从接口复制:sourcefunction取消源。大多数源在sourcefunction.run(sourcecontext)方法中都有while循环。实现需要确保在调用此方法后源将跳出该循环。典型的模式是在这个方法中将“volatile boolean isrunning”标志设置为false。在循环条件中检查该标志。
当源被取消时,执行线程也会被中断(通过thread.interrupt())。中断严格地发生在调用此方法之后,因此任何中断处理程序都可以依赖于此方法已完成的事实。很好的做法是将此方法更改的任何标志设置为“volatile”,以保证此方法的效果对任何中断处理程序的可见性。
指定者:在接口sourcefunction中取消
你是对的。有必要使用 SimpleStringSchema . 这是基于这个答案https://stackoverflow.com/a/44247452/2096986. 看看这个例子。我先发了线 Flink code we saw also works in a cluster Kafka的消费者消费了这个信息。然后我派 SHUTDOWNDDDDDDD 这对完成这条流也没有影响。最后,我把 SHUTDOWN 流作业完成。查看程序下面的日志。

package org.sense.flink.examples.stream.kafka;

import java.util.Properties;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

public class KafkaConsumerQuery {

    public KafkaConsumerQuery() throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "test");

        FlinkKafkaConsumer myConsumer = new FlinkKafkaConsumer(java.util.regex.Pattern.compile("test"),
                new MySimpleStringSchema(), properties);

        DataStream<String> stream = env.addSource(myConsumer);
        stream.print();

        System.out.println("Execution plan >>>\n" + env.getExecutionPlan());
        env.execute(KafkaConsumerQuery.class.getSimpleName());
    }

    private static class MySimpleStringSchema extends SimpleStringSchema {
        private static final long serialVersionUID = 1L;
        private final String SHUTDOWN = "SHUTDOWN";

        @Override
        public String deserialize(byte[] message) {

            return super.deserialize(message);
        }

        @Override
        public boolean isEndOfStream(String nextElement) {
            if (SHUTDOWN.equalsIgnoreCase(nextElement)) {
                return true;
            }
            return super.isEndOfStream(nextElement);
        }
    }

    public static void main(String[] args) throws Exception {
        new KafkaConsumerQuery();
    }
}

日志:

2020-07-02 16:39:59,025 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - [Consumer clientId=consumer-8, groupId=test] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
3> Flink code we saw also works in a cluster. To run this code in a cluster
3> SHUTDOWNDDDDDDD
2020-07-02 16:40:27,973 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source -> Sink: Print to Std. Out (3/4) (5f47c2b3f55c5eb558484d49fb1fcf0e) switched from RUNNING to FINISHED.
2020-07-02 16:40:27,973 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Source: Custom Source -> Sink: Print to Std. Out (3/4) (5f47c2b3f55c5eb558484d49fb1fcf0e).
2020-07-02 16:40:27,974 INFO  org.apache.flink.runtime.taskmanager.Task                     - Ensuring all FileSystem streams are closed for task Source: Custom Source -> Sink: Print to Std. Out (3/4) (5f47c2b3f55c5eb558484d49fb1fcf0e) [FINISHED]
2020-07-02 16:40:27,975 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Un-registering task and sending final execution state FINISHED to JobManager for task Source: Custom Source -> Sink: Print to Std. Out (3/4) 5f47c2b3f55c5eb558484d49fb1fcf0e.
2020-07-02 16:40:27,979 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> Sink: Print to Std. Out (3/4) (5f47c2b3f55c5eb558484d49fb1fcf0e) switched from RUNNING to FINISHED.

相关问题