如何检查Kafka消费者是否准备好了

w6lpcovy  于 2021-06-07  发布在  Kafka
关注(0)|答案(4)|浏览(415)

我已将kafka提交策略设置为最新且缺少前几条消息。如果我在开始向输入主题发送消息之前先休眠20秒,那么一切都正常工作。我不确定问题是否出在消费者花很长时间重新平衡分区上。有没有办法在开始投票前知道消费者是否准备好了?

t40tm48m

t40tm48m1#

您可以执行以下操作:
我有一个测试,读取Kafka主题的数据。
所以您不能在多线程环境中使用kafkaconsumer,但是您可以传递参数“atomicreference assignment”,在使用者线程中更新它,并在另一个线程中读取它。
例如,在项目中截取工作代码进行测试:

private void readAvro(String readFromKafka,
                      AtomicBoolean needStop,
                      List<Event> events,
                      String bootstrapServers,
                      int readTimeout) {
    // print the topic name
    AtomicReference<Set<TopicPartition>> assignment = new AtomicReference<>();
    new Thread(() -> readAvro(bootstrapServers, readFromKafka, needStop, events, readTimeout, assignment)).start();

    long startTime = System.currentTimeMillis();
    long maxWaitingTime = 30_000;
    for (long time = System.currentTimeMillis(); System.currentTimeMillis() - time < maxWaitingTime;) {
        Set<TopicPartition> assignments = Optional.ofNullable(assignment.get()).orElse(new HashSet<>());
        System.out.println("[!kafka-consumer!] Assignments [" + assignments.size() + "]: "
                + assignments.stream().map(v -> String.valueOf(v.partition())).collect(Collectors.joining(",")));
        if (assignments.size() > 0) {
            break;
        }
        try {
            Thread.sleep(1_000);
        } catch (InterruptedException e) {
            e.printStackTrace();
            needStop.set(true);
            break;
        }
    }
    System.out.println("Subscribed! Wait summary: " + (System.currentTimeMillis() - startTime));
}

private void readAvro(String bootstrapServers,
                      String readFromKafka,
                      AtomicBoolean needStop,
                      List<Event> events,
                      int readTimeout,
                      AtomicReference<Set<TopicPartition>> assignment) {

    KafkaConsumer<String, byte[]> consumer = (KafkaConsumer<String, byte[]>) queueKafkaConsumer(bootstrapServers, "latest");
    System.out.println("Subscribed to topic: " + readFromKafka);
    consumer.subscribe(Collections.singletonList(readFromKafka));

    long started = System.currentTimeMillis();
    while (!needStop.get()) {
        assignment.set(consumer.assignment());
        ConsumerRecords<String, byte[]> records = consumer.poll(1_000);
        events.addAll(CommonUtils4Tst.readEvents(records));

        if (readTimeout == -1) {
            if (events.size() > 0) {
                break;
            }
        } else if (System.currentTimeMillis() - started > readTimeout) {
            break;
        }
    }

    needStop.set(true);

    synchronized (MainTest.class) {
        MainTest.class.notifyAll();
    }
    consumer.close();
}

附笔
needstop-全局标志,在成功失败的情况下停止所有正在运行的线程(如果有的话)
事件-我要检查的对象的列表
readtimeout—在读取所有数据之前等待的时间,如果readtimeout==-1,则在读取任何数据时停止

6qqygrtg

6qqygrtg2#

如果您的策略设置为“最新”(如果没有以前提交的偏移量,则该策略将生效),但是您没有以前提交的偏移量,那么您不应该担心“丢失”消息,因为您告诉kafka不要关心“以前”发送给您的消费者的消息是否已准备就绪。
如果您关心“以前”的消息,则应将策略设置为“最早”。
在任何情况下,无论政策如何,您看到的行为都是暂时的,即一旦承诺的补偿在Kafka中保存,每次重新启动时,消费者都会从之前离开的地方重新开始

kulphzqa

kulphzqa3#

多亏了亚历克赛(我也投了赞成票),我似乎基本上按照同样的想法解决了我的问题。
只是想分享我的经验。。。在我们的例子中,我们以请求&响应的方式使用kafka,有点像rpc。正在发送一个主题的请求,然后等待另一个主题的响应。遇到类似的问题,即错过第一个响应。
我试过了 ... KafkaConsumer.assignment(); 反复地(用 Thread.sleep(100); )但似乎没有帮助。添加 KafkaConsumer.poll(50); 似乎已经启动了消费者(群体),也收到了第一React。测试了几次,现在一直有效。
顺便说一句,测试需要停止应用程序&删除kafka主题,并重新启动kafka。
附言:只是打电话 poll(50); 没有 assignment(); 像alexey提到的那样,获取逻辑可能不能保证消费者(组)已经准备好了。

xdnvmnnf

xdnvmnnf4#

你可以用 consumer.assignment() ,它将返回一组分区,并验证是否为该主题分配了所有可用的分区。
如果您使用的是springkafka项目,那么可以包含springkafka测试依赖性,并使用下面的方法等待主题分配,但是您需要有容器。 ContainerTestUtils.waitForAssignment(Object container, int partitions);

相关问题