flink kafka消费群ID不工作

4c8rllxm  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(672)

我在用Kafka和Flink。在一个简单的程序中,我使用了flinks-flinkkafkaconsumer09,将组id分配给它。
根据kafka的行为,当我用相同的group.id在同一主题上运行两个消费者时,它应该像一个消息队列一样工作。我认为它应该是这样工作的:如果2条消息发送到Kafka,flink程序中的每一条或其中一条将处理这2条消息两次(假设总共有2行输出)。
但实际结果是,每个程序将接收2条消息。
我尝试使用kafka服务器下载附带的客户机。它以文档化的方式工作(处理2条消息)。
我尝试在flink程序的同一个主要功能中使用两个kafka消费者。共处理4条消息。
我还试着运行了2个flink示例,并为每个示例分配了相同的kafka消费程序。4条信息。
有什么想法吗?这是我期望的输出:

1> Kafka and Flink2 says: element-65  
2> Kafka and Flink1 says: element-66

以下是我经常得到的错误输出:

1> Kafka and Flink2 says: element-65  
1> Kafka and Flink1 says: element-65  
2> Kafka and Flink2 says: element-66  
2> Kafka and Flink1 says: element-66

下面是代码段:

public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    ParameterTool parameterTool = ParameterTool.fromArgs(args);

    DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));

    messageStream.rebalance().map(new MapFunction<String, String>() {
        private static final long serialVersionUID = -6867736771747690202L;

        @Override
        public String map(String value) throws Exception {
            return "Kafka and Flink1 says: " + value;
        }
    }).print();

    env.execute();
}

我试过两次运行它,也试过另一种方式:为main函数中的每个数据流创建2个datastream和env.execute()。

uubf1zoe

uubf1zoe1#

今天在flink用户邮件列表上有一个非常类似的问题,但是我找不到在这里发布的链接。下面是答案的一部分:
“在内部,flink-kafka连接器不使用使用者组管理功能,因为它们在每个并行示例上使用较低级别的API(0.8中的simpleconsumer和0.9中的kafkaconsumer#assign(…)),以便更好地控制单个分区的使用。因此,实际上,flink-kafka连接器中的“group.id”设置仅用于将偏移提交回zk/kafka代理。”
也许这能为你澄清一些事情。
此外,还有一篇关于与Flink和Kafka合作的博客文章,可能会对你有所帮助(https://data-artisans.com/blog/kafka-flink-a-practical-how-to).

2ledvvac

2ledvvac2#

因为除了向zookeeper提交补偿之外,flink kafka消费者的group.id没有太多的用途。对于flink kafka消费者而言,是否有任何补偿监控方法。我发现有一种方法[在消费者团体/消费者补偿检查器的帮助下]适用于游戏机消费者,但不适用于flink kafka消费者。
我们想看看我们的flink kafka消费者是如何落后于/落后于kafka主题大小[在给定时间点主题中的消息总数],最好是在分区级别。

相关问题