Kafka消费偏移提交未知成员id

yhuiod9q  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(250)

在我清楚地解释这个问题之前,我想和大家分享一下房产。
以下是我使用过的producer属性:

bootstrap.servers=xyz:9092
acks=all
retries=0
batch.size=16384
auto.commit.interval.ms=1000
linger.ms=0 key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
block.on.buffer.full=true

消费者财产:

bootstrap.servers=ec2-54-218-85-12.us-west-2.compute.amazonaws.com:9092
group.id=test
enable.auto.commit=true
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
session.timeout.ms=10000
fetch.min.bytes=50000
receive.buffer.bytes=262144
max.partition.fetch.bytes=2097152

生产者.java

int numOfmessages = Integer.valueOf(args[1]);
    // set up the producer
    KafkaProducer<String, String> producer;
    try (InputStream props = Resources.getResource("producer.props").openStream()) {
        Properties properties = new Properties();
        properties.load(props);
        producer = new KafkaProducer<>(properties);
    }

    try {
        for (int i = 0; i < numOfmessages; i++) {
            String message = "message number " +i;
            // send lots of messages
            producer.send(new ProducerRecord<String, String>("fast-messages", message ));
            logger.info("sent message "+message);
        }
    } catch (Throwable throwable) {
        System.out.printf("%s", throwable.getStackTrace());
    } finally {
        producer.close();
    }

消费者.java

KafkaConsumer<String, String> consumer;
    try (InputStream props = Resources.getResource("consumer.props").openStream()) {
        Properties properties = new Properties();
        properties.load(props);
        consumer = new KafkaConsumer<>(properties);
    }

    try {
        consumer.subscribe(Arrays.asList("fast-messages"));
        int timeouts = 0;
        //noinspection InfiniteLoopStatement
        while (true) {
            // read records with a short timeout. If we time out, we don't really care.
            ConsumerRecords<String, String> records = consumer.poll(200);
            if (records.count() == 0) {
                timeouts++;
            } else {
                logger.info("Got %d records after %d timeouts\n", records.count(), timeouts);
                timeouts = 0;
            }
            for (ConsumerRecord<String, String> record : records) {

                logger.info("consumed "+record.value());

                logger.info("doing some complex operation in consumer with "+record.value());

                for(int i= 0;i <999999999;i++) {

                    for(int j= 0;j <999999999;j++) {

                    }

                }

            }
        }
    }finally {
        consumer.close();
    }

使用上述属性和代码,当我运行producer时,它可以安全地发送所有消息。在使用者方面,我可以使用所有消息,但是当发生偏移提交时,它会失败,并出现以下错误。
2016-11-04 09:55:08信息摘要ordinator:540 - 标记协调员2147483647死亡。2016-11-04 09:55:08错误用户ordinator:544 - 提交组测试2016-11-04 09:55:08的偏移量时出现未知\u成员\u id错误警告consumercoordinator:418 - 自动偏移提交失败:由于组重新平衡2016-11-04 09:55:09错误consumerco,无法完成提交ordinator:544 - 执行时出现未知\u成员\u id错误提交组测试偏移量2016-11-04 09:55:09警告consumercoordinator:439 - 自动偏移提交失败:2016-11-04 09:55:09 info abstractcoordinator:361 - 由于未知成员id,尝试加入组测试失败,正在重置并重试。
我有点理解这个问题,它的失败是因为我们在消费者内部进行了复杂的操作。对如何处理这个问题有什么建议吗?这一定是一个非常常见的场景,只是想了解我们是否需要更改任何配置等。。。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题